# Partial Exam of Spark

@roman

20 apr 2024

---
# Settings

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F, lit, col
from pyspark.sql.window import Window

In [None]:
# init spark session
spark = SparkSession.builder.appName('QPP').getOrCreate()

In [None]:
# bucket
NAME = 'roman'
BUCKET = f"s3://itam-analytics-{NAME}"
FOLDER = 'qqp'
NAME_FILE = 'qqp'

# type of catalog
CATALOG_TYPE = 'medicamentos'

---
# Data

In [None]:
# read raw data from S3
df_qqp = spark.read.csv(f"{BUCKET}/{FOLDER}/{NAME_FILE}.csv", header=True, inferSchema=True)

In [None]:
# look columns
df_qqp.printSchema()

In [None]:
# save to parquet in s3, partioned by catalog type
df_qqp.write.mode('overwrite').partitionBy('catalog').parquet(f"{BUCKET}/{FOLDER}/{NAME_FILE}")

---
# Questions

## S1: General

### Get Data

In [None]:
# read parquet from s3
df_qqp = spark.read.parquet(f"{BUCKET}/{FOLDER}/{NAME_FILE}")

# look columns
df_qqp.printSchema()

In [None]:
# from the column created_at, extract the year
df_qqp = df_qqp.withColumn('year', F.year('created_at'))

In [None]:
# look # of rows  per year
df_qqp.groupBy('year').count().show()

### Q1: ¿Cuántos catálogos diferentes tenemos?

In [None]:
# num different catalogs
df_qqp.agg(F.countDistinct('catalog')).show()

In [None]:
# num different catalogs per year
df_qqp.groupBy('year').agg(F.countDistinct('catalog')).show()

En total hay 12 catálogos diferentes, sin embargo no todos los catálogos están presentes en todos los años. Por ejemplo, en 2018 solo hay 10 catálogos diferentes.

### Q2: ¿Cuáles son los 20 catálogos con más observaciones?

In [None]:
# table count of each catalog, show only first 20
table_count_catalogue = df_qqp.groupBy('year', 'catalog').count().orderBy('year', 'catalog')

# get only first 20 per year
table_count_catalogue = (
    table_count_catalogue
    .withColumn('rank', F.row_number().over(
        Window.partitionBy('year').orderBy(F.desc('count'))
        ))
    .filter('rank <= 20')
    )
table_count_catalogue.show()

### Q3: ¿Tenemos datos de todos los estados del país? De no ser así, ¿cuáles faltan?

In [None]:
# get table count for each year catalog the # of distinct states
table_count_num_states_per_year_catalog = (
    df_qqp
    .groupBy('catalog', 'state', 'year')
    .count()
    .orderBy('catalog', 'state', 'year')
    )

# generate table of # of distinct states per year and catalog
distinct_states = df_qqp.select('state').distinct()
distinct_years = df_qqp.select('year').distinct()
distinct_catalogs = df_qqp.select('catalog').distinct()

df_cross_join_state_year_catalog = (
    distinct_catalogs
    .crossJoin(distinct_states)
    .crossJoin(distinct_years)
    )

# get which catalog state and year are missing
table_missing = (
    df_cross_join_state_year_catalog
    .join(table_count_num_states_per_year_catalog, ['catalog', 'state', 'year'], 'left')
    .filter('count is null')
    )

# show missing
print(f"Missing: {table_missing.count()}")

In [None]:
# look
table_missing.show()

No hay datos para todos los estados. Faltan 2 estados a partir del 2021.

In [None]:
# get table of the names of the states
table_states = df_qqp.select('state').distinct().orderBy('state')

# count the # of rows per state and year
table_count_states = df_qqp.groupBy('state', 'year').count().orderBy('state', 'year')
table_count_states.show()

In [None]:
# pivot the table to have years as columns
(
    table_count_states
    .groupBy('state')
    .pivot('year')
    .agg(F.coalesce(F.lit(1), F.lit(0)))
    .orderBy('state')
    .show(32)
    )

Los estados que no tienen información de ciertos catálogos (si es esta la pregunta?)

### Q4: ¿Cuántas observaciones tenemos por estado?


In [None]:
# count the number of different catalogs (WIP)
table_qqp_state_catalog_year = (
    df_qqp
    .groupBy('state', 'catalog', 'year')
    .count()
    .orderBy('state', 'catalog', 'year')
    )
table_qqp_state_catalog_year.show()

### Q5: De cada estado obten: el número de catalogos diferentes por año, ¿ha aumentado el número de catálogos con el tiempo?

In [None]:
# count the number of different catalogs per year
table_qqp_year_state_diff_catalog = (
    df_qqp
    .groupBy('state', 'year')
    .agg(F.countDistinct('catalog').alias('count_catalogs'))
    .orderBy('state', 'year')
    .groupBy('state')
    .pivot('year')
    .agg(F.first('count_catalogs'))
    .orderBy('state')
    )
table_qqp_year_state_diff_catalog.show(32)

Falta observar tendencias en el tiempo para afirmar si hay un crecimiento en el número de catálogos por estado

### Save tables

In [None]:
# save tables to s3
table_count_catalogue.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/all/table_count_catalogue")
table_missing.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/all/table_missing")
table_qqp_state_catalog_year.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/all/table_qqp_state_catalog_year")
table_qqp_year_state_diff_catalog.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/all/table_qqp_year_state_diff_catalog")

## S2: Particular

### Get Data

In [None]:
# filter by catalog
df_qqp_our_category = df_qqp.filter(col('catalog') == CATALOG_TYPE)

# look # of rows
df_qqp_our_category.count()

### Q1: ¿Cuańtas marcas diferentes tiene tu categoría?

In [None]:
# how many different brands are there?
df_qqp_our_category.select('brand').distinct().count()

### Q2: ¿Cuál es la marca con mayor precio? ¿En qué estado?

In [None]:
# which brand has the highest price and in which state?
(
    df_qqp_our_category
    .orderBy(col('price').desc())
    .select('brand', 'state', 'price').show(1)
    )

### Q3: ¿Cuál es la marca con menor precio en CDMX? (en aquel entonces Distrito Federal)


In [None]:
# idem but in CDMX
(
    df_qqp_our_category
    .filter(col('state') == 'ciudad de mexico')
    .orderBy(col('price').desc())
    .select('brand', 'state', 'price').show(1)
    )

### Q4: ¿Cuál es la marca con mayores observaciones?

In [None]:
# which brand has more observations?
table_brand_count = df_qqp_our_category.groupBy('brand').count().orderBy('count', ascending=False)
table_brand_count.show()

### Q5: ¿Cuáles son el top 5 de marcas con mayor precio en cada estado? ¿Son diferentes?

In [None]:
# for each state, get the top 5 most expensive brands (WIP)
table_price_brand_top5 = (
    df_qqp_our_category
    .groupBy('state', 'brand')
    .agg(F.max('price').alias('max_price'))
    .orderBy(col('max_price').desc())
    .withColumn('rank', F.row_number().over(
        Window.partitionBy('state').orderBy(F.desc('max_price'))
        ))
    .filter('rank <= 5')
    )


### Q6: ¿Cuáles son el top 5 de marcas con menor precio en CDMX? (en aquel entonces Distrito Federal)

In [None]:
# top 5 least expensive brands in CDMX
(
    df_qqp_our_category
    .filter(col('state') == 'ciudad de mexico')
    .groupBy('brand')
    .agg(F.min('price').alias('min_price'))
    .orderBy(col('min_price').asc())
    .select('brand', 'min_price')
    .show(5)
    )

### Q7: ¿Cuáles son el top 5 de marcas con mayores observaciones? ¿Se parecen a las de nivel por estado?

In [None]:
# top 5 brands with more observations per state (WIP)
(
    df_qqp_our_category
    .groupBy('brand')
    .count()
    .orderBy('count', ascending=False)
    .show(5)
    )

In [None]:
# table of counts by state and brand and get top 5 brands per state
table_brand_state_count_top5 = (
    df_qqp_our_category
    .groupBy('state', 'brand')
    .count()
    .withColumn('rank', F.row_number().over(
        Window.partitionBy('state').orderBy(F.desc('count'))
        ))
    .filter('rank <= 5')
    )

### Q8: ¿Ha dejado de existir alguna marca durante los años que tienes? ¿Cuál? ¿Cuándo desapareció?

In [None]:
# distinct observeations for year and brand (WIP)
table_brand_year_count = (
    df_qqp_our_category
    .groupBy('year', 'brand')
    .count()
    .orderBy('year', 'count')
    )

### Q9: Genera una gráfica de serie de tiempo por estado para la marca con mayor precio -en todos los años-, donde el eje equis es el año y el eje ye es el precio máximo.

In [None]:
# for each year, get the most expensive brand (WIP)
table_state_year_max_price = (
    df_qqp_our_category
    .groupBy('state', 'year')
    .agg(F.max('price').alias('max_price'))
    .orderBy('state', 'year', col('max_price').desc())
    )
table_state_year_max_price.show()

## Save

In [None]:
# sabe tables to s3
table_brand_count.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/brand_count")
table_price_brand_top5.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/price_brand_top5")
table_brand_state_count.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/brand_state_count")
table_brand_state_count_top5.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/brand_year_count_top5")
table_state_year_max_price.write.mode('overwrite').parquet(f"{BUCKET}/{FOLDER}/ouputs/state_year_max_price")
