## Imports

In [None]:
import pyspark.sql.functions as F
from dhzlib import DhzLib

## Teste de Conexão - Banco MySQL

In [None]:
dhz = DhzLib()
dhz.test_connection_mysql()

## Lendo arquivos CSV e escrevendo dados no Banco MySQL

In [None]:
df = dhz.spark.read.csv('../data/*.csv', header=True, inferSchema=True)
dhz.write_mysql(df, 'raw_capital_bikeshare')
df.count()

## Criando tabela na camada trusted

In [None]:
raw_df = dhz.read_mysql('raw_capital_bikeshare')

In [None]:
raw_df.printSchema()

In [None]:
raw_df.select(
    F.col('started_at'),
    F.from_utc_timestamp(F.col('started_at').cast('timestamp'), 'America/Sao_Paulo').alias('started_at_sp')
).limit(10).show()

In [None]:
trusted_df = df.select(
    F.col('ride_id').alias('ride_id'),
    F.col('rideable_type').alias('rideable_type'),
    F.from_utc_timestamp(F.col('started_at').cast('timestamp'), 'America/Sao_Paulo').alias('started_at'),
    F.from_utc_timestamp(F.col('ended_at').cast('timestamp'), 'America/Sao_Paulo').alias('ended_at'),
    F.col('start_station_name').alias('start_station_name'),
    F.col('start_station_id').cast('int').alias('start_station_id'),
    F.col('end_station_name').alias('end_station_name'),
    F.col('end_station_id').cast('int').alias('end_station_id')
)

In [None]:
dhz.write_mysql(trusted_df, 'tr_capital_bikeshare')

## Gerando estatísticas para as tabelas

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType, DoubleType
tables = {
    'raw_capital_bikeshare': 'started_at',
    'tr_capital_bikeshare': 'started_at'
}
stats_df = []
schema = StructType([
    StructField('table', StringType(), True),
    StructField('date_column', StringType(), True),
    StructField('min_date', DateType(), True),
    StructField('max_date', DateType(), True),
    StructField('total_rows', LongType(), True),
    StructField('avg_rows_per_day', DoubleType(), True),
    StructField('stddev_rows_per_day', DoubleType(), True),
    StructField('min_rows_per_day', LongType(), True),
    StructField('max_rows_per_day', LongType(), True),
    StructField('stats_date', DateType(), True)
])

for table, column in tables.items():
    df = dhz.read_mysql(table)
    group_df = df.groupBy(F.col(column).cast('date').alias('date_')).agg(F.count(column).alias('rows'))
    
    stats = group_df.select(
        F.min('date_').alias('min_date'),
        F.max('date_').alias('max_date'),
        F.sum('rows').alias('total_rows'),
        F.avg('rows').alias('avg_rows_per_day'),
        F.stddev('rows').alias('stddev_rows_per_day'),
        F.min('rows').alias('min_rows_per_day'),
        F.max('rows').alias('max_rows_per_day'),
        F.current_timestamp().alias('stats_date')
        )
    stats = stats.withColumn('table', F.lit(table))
    stats = stats.withColumn('date_column', F.lit(column))
    stats = stats.select(
        F.col('table'),
        F.col('date_column'),
        F.col('min_date'),
        F.col('max_date'),
        F.col('total_rows'),
        F.col('avg_rows_per_day'),
        F.col('stddev_rows_per_day'),
        F.col('min_rows_per_day'),
        F.col('max_rows_per_day'),
        F.col('stats_date')
    )
    
    if not stats_df:
        stats_df = stats
    else:
        stats_df = stats_df.unionAll(stats)
    

In [None]:
stats_df.printSchema()

## RASCUNHO

In [None]:
schema = StructType([
    StructField('table', StringType(), True),
    StructField('date_column', StringType(), True),
    StructField('min_date', DateType(), True),
    StructField('max_date', DateType(), True),
    StructField('total_rows', LongType(), True),
    StructField('avg_rows_per_day', DoubleType(), True),
    StructField('stddev_rows_per_day', DoubleType(), True),
    StructField('min_rows_per_day', LongType(), True),
    StructField('max_rows_per_day', LongType(), True),
    StructField('stats_date', DateType(), True)
])

table = 'tr_capital_bikeshare'
column = 'started_at'
df = dhz.read_mysql(table)
group_df = df.groupBy(F.col(column).cast('date').alias('date_')).agg(F.count(column).alias('rows'))
stats = group_df.select(
    F.min('date_').alias('min_date'),
    F.max('date_').alias('max_date'),
    F.sum('rows').alias('total_rows'),
    F.avg('rows').alias('avg_rows_per_day'),
    F.stddev('rows').alias('stddev_rows_per_day'),
    F.min('rows').alias('min_rows_per_day'),
    F.max('rows').alias('max_rows_per_day'),
    F.current_timestamp().alias('stats_date')
    )
stats = stats.withColumn('table', F.lit(table))
stats = stats.withColumn('date_column', F.lit(column))
stats = stats.select(
    F.col('table'),
    F.col('date_column'),
    F.col('min_date'),
    F.col('max_date'),
    F.col('total_rows'),
    F.col('avg_rows_per_day'),
    F.col('stddev_rows_per_day'),
    F.col('min_rows_per_day'),
    F.col('max_rows_per_day'),
    F.col('stats_date')
)

stats.show()
stats = dhz.spark.createDataFrame(stats.toPandas(), schema)
if not stats_df:
    stats_df = stats
else:
    stats_df = stats_df.union(stats)

In [None]:
type(stats)

In [None]:
stats = stats.select(
    F.col('table'),
    F.col('date_column'),
    F.col('min_date'),
    F.col('max_date'),
    F.col('total_rows'),
    F.col('avg_rows_per_day'),
    F.col('stddev_rows_per_day'),
    F.col('min_rows_per_day'),
    F.col('max_rows_per_day'),
    F.col('stats_date')
)
stats.toPandas().dtypes

## RASCUNHO

In [None]:
import os
from dotenv import load_dotenv
load_dotenv()

In [None]:
# spark = SparkSession.builder \
#         .appName('Connecting to MySQL') \
#         .getOrCreate()

In [None]:

url = os.getenv("MYSQL_URL")
mysql_properties = {
    'user': os.getenv("MYSQL_USER"),
    'password': os.getenv("MYSQL_PASSWORD")
}

In [None]:
teste = spark.read.csv('../data/*.csv', header=True)

In [None]:
teste.count()

In [None]:
dhz.spark.conf.get("spark.sql.session.timeZone")