In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
import time
spark = SparkSession.builder.getOrCreate()

schema_local_system = 'id STRING, first_name STRING, last_name STRING, email STRING, gender STRING, age INT, plan STRING, enrollment_in STRING'
df_gympass_customer = spark.read.option("header","true").csv('./data/fpfgym/gympass_customer.csv')
df_gympass_customer.write.format('delta').save('s3a://datalake-poslabs/bronze/gympass_customer')
s = time.time()
df_local_system_customer = spark.read.option("header","true").schema(schema_local_system).csv('./data/fpfgym/local_system_customer.csv')
df_local_system_customer.write.format('delta').save('s3a://datalake-poslabs/bronze/local_system_customer')
e = time.time()
print(e-s)

0.8739902973175049


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, when, lit, broadcast, monotonically_increasing_id

spark = SparkSession.builder.getOrCreate()
df_customer_gympass = spark.read.format("delta").load('s3a://datalake-poslabs/bronze/gympass_customer')
df_plans = df_customer_gympass.select(col('plano').alias('plano_nome')).distinct()
df_plans = df_plans.withColumn('id_plano', monotonically_increasing_id()+1)
df_plans.write.format('delta').save('s3a://datalake-poslabs/silver/plans')

In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, when, lit, broadcast, monotonically_increasing_id, lit
spark = SparkSession.builder.getOrCreate()
df_plans = spark.read.format("delta").load('s3a://datalake-poslabs/silver/plans')
df_customer_gympass = spark.read.format("delta").load('s3a://datalake-poslabs/bronze/gympass_customer')
df_local_system_customer = spark.read.format("delta").load('s3a://datalake-poslabs/bronze/local_system_customer')
df_local_system_customer = df_local_system_customer\
                           .withColumnRenamed('first_name', 'nome')\
                           .withColumnRenamed('last_name', 'sobrenome')\
                           .withColumnRenamed('gender', 'genero')\
                           .withColumnRenamed('age', 'idade')\
                           .withColumnRenamed('enrollment_in', 'matriculado_em')\
                           .withColumnRenamed('plan', 'plano')

df_local_system_customer = df_local_system_customer\
                           .withColumn('telefone', lit('Não definido'))\
                           .withColumn('endereco', lit('Não definido'))

df_local_system_customer = df_local_system_customer.withColumn('fonte', lit('sistema_local'))
df_customer_gympass = df_customer_gympass.withColumn('fonte', lit('gympass'))

df_union = df_local_system_customer.unionByName(df_customer_gympass)
df_union = df_union.withColumn('matriculado_em', to_date(col('matriculado_em'), 'M/d/yyyy').cast('date'))
df_union = df_union.withColumn('email', when(df_union.email.rlike(r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+'), col('email')).otherwise(lit('Não definido')))
df_plans = spark.read.format("delta").load('s3a://datalake-poslabs/silver/plans')
df_union = df_union.join(broadcast(df_plans), df_union['plano'] == df_plans['plano_nome'], 'inner')
df_union = df_union.drop('plano_nome', 'plano')
df_union.write.format('delta').save('s3a://datalake-poslabs/silver/customer')
print(df_union.count())

2000


In [80]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, when, lit, broadcast, monotonically_increasing_id, month, year

spark = SparkSession.builder.getOrCreate()
df_customer_silver = spark.read.format("delta").load('s3a://datalake-poslabs/silver/customer')

df_customer_silver = df_customer_silver.withColumn('matriculado_em', to_date(col('matriculado_em'), 'M/d/yyyy').cast('date'))
df_customer_silver.show()
df_customer_silver = df_customer_silver.groupBy(year(col('matriculado_em')).alias('year'), month(col('matriculado_em')).alias('month')).agg({'id': 'count'})
df_customer_silver = df_customer_silver.orderBy('year', 'month')
df_customer_silver.show()

+----------+--------+----------+--------------------+-----------+-----+--------------+----------+--------------------+-------+--------+
|        id|    nome| sobrenome|               email|     genero|idade|matriculado_em|  telefone|            endereco|  fonte|id_plano|
+----------+--------+----------+--------------------+-----------+-----+--------------+----------+--------------------+-------+--------+
|80-5437854|  Lenard|    Pobjay|        Não definido|       Male|   24|    2022-02-14|3176236246| 2 Mockingbird Point|gympass|       2|
|77-0899770|  Cherri|     Sames|      csames1@hp.com|     Female|   15|    2022-04-04|4048434801|  73 Butternut Drive|gympass|       2|
|05-7286357| Cheston|    Keeler|ckeeler2@t-online.de|       Male|   81|    2022-06-19|9007954668|    71506 Boyd Plaza|gympass|       1|
|01-3984300|    Moll|   Merrill|mmerrill3@shareas...|     Female|   30|    2022-11-05|8888324782|                null|gympass|       2|
|96-1408728|   Cindi|     Dunne|   cdunne4@globo

In [81]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, when, lit, broadcast, monotonically_increasing_id

spark = SparkSession.builder.getOrCreate()
df_customer_silver = spark.read.format("delta").load('s3a://datalake-poslabs/silver/customer')

df_customer_silver = df_customer_silver.withColumn('matriculado_em', to_date(col('matriculado_em'), 'M/d/yyyy').cast('date'))
df_customer_silver = df_customer_silver.groupBy(col('fonte')).agg({'id': 'count'})
df_customer_silver.show()

+----------+--------+----------+--------------------+-----------+-----+--------------+----------+--------------------+-------+--------+
|        id|    nome| sobrenome|               email|     genero|idade|matriculado_em|  telefone|            endereco|  fonte|id_plano|
+----------+--------+----------+--------------------+-----------+-----+--------------+----------+--------------------+-------+--------+
|80-5437854|  Lenard|    Pobjay|        Não definido|       Male|   24|    2022-02-14|3176236246| 2 Mockingbird Point|gympass|       2|
|77-0899770|  Cherri|     Sames|      csames1@hp.com|     Female|   15|    2022-04-04|4048434801|  73 Butternut Drive|gympass|       2|
|05-7286357| Cheston|    Keeler|ckeeler2@t-online.de|       Male|   81|    2022-06-19|9007954668|    71506 Boyd Plaza|gympass|       1|
|01-3984300|    Moll|   Merrill|mmerrill3@shareas...|     Female|   30|    2022-11-05|8888324782|                null|gympass|       2|
|96-1408728|   Cindi|     Dunne|   cdunne4@globo

In [82]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, when, lit, broadcast, monotonically_increasing_id

spark = SparkSession.builder.getOrCreate()
df_customer_silver = spark.read.format("delta").load('s3a://datalake-poslabs/silver/customer')
# Defina os limites das faixas de idade (de 10 em 10)
age_lower_bound = 10
age_upper_bound = 100
age_step = 10

# Crie uma expressão para calcular a faixa de idade correspondente
age_group_expr = ((col("idade") // age_step) * age_step).alias("faixa_de_idade")

# Agrupe os dados pela faixa de idade
df_customer_silver = df.groupBy(age_group_expr).count()

# Mostre o DataFrame resultante
df_customer_silver.show()

TypeError: unsupported operand type(s) for //: 'Column' and 'int'