## Silver Layer Transform

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark.conf.set("fs.azure.account.auth.type.fdaprojectlake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.fdaprojectlake.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.fdaprojectlake.dfs.core.windows.net", "3567620e-6cef-48d1-b87d-57a409167baf")
spark.conf.set("fs.azure.account.oauth2.client.secret.fdaprojectlake.dfs.core.windows.net", "asG8Q~RjVssL1hNWAwcbovJUQj.T8QEWAyinGawR")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.fdaprojectlake.dfs.core.windows.net", "https://login.microsoftonline.com/f823ed38-24ed-4a95-9b09-de29523d7729/oauth2/token")

In [0]:
SOURCE = 'abfss://bronze@fdaprojectlake.dfs.core.windows.net/food/event/food-event-0001-of-0001.json.zip/food-event-0001-of-0001.json'
DEST = 'abfss://silver@fdaprojectlake.dfs.core.windows.net/food/event/food-event-0001-of-0001.parquet.snappy'

In [0]:
df = spark.read.option("multiline","true").json(SOURCE)

In [0]:
raw = df.select(explode(col('results')).alias('raw'))
raw = raw.withColumn('record_id', monotonically_increasing_id())

consumer = raw.select('record_id', 
                      'raw.report_number',
                      'raw.consumer.age',
                      'raw.consumer.gender',
                      'raw.products.industry_code',
                      'raw.products.industry_name',
                      'raw.products.name_brand',
                      'raw.products.role',
                      'raw.outcomes',
                      'raw.reactions',
                      to_date('raw.date_created', 'yyyyMMdd').alias('date_created'),
                      to_date('raw.date_started', 'yyyyMMdd').alias('date_started'))
consumer = consumer.withColumn('age', consumer['age'].cast(DecimalType()))
consumer = consumer.withColumn('industry_code',explode(col('industry_code')))
consumer = consumer.withColumn('industry_name',explode(col('industry_name')))\
        .withColumn('industry_name',initcap(col('industry_name')))
consumer = consumer.withColumn('name_brand',explode(col('name_brand')))
consumer = consumer.withColumn('role',explode(col('role')))
consumer = consumer.withColumn('outcomes',explode(col('outcomes')))
consumer = consumer.withColumn('reactions',explode(col('reactions')))\
        .withColumn('reactions',initcap(col('reactions')))

del raw

In [0]:
consumer = consumer.withColumn('month_created', month(col('date_created')))\
                .withColumn('year_created', year(col('date_created')))\
                .withColumn('month_started', month(col('date_started')))\
                .withColumn('year_started', year(col('date_started')))

In [0]:
print("Number of records: ", consumer.count())

In [0]:
consumer.write.format('parquet')\
    .mode('overwrite')\
    .option('compression','snappy')\
    .option('path',DEST)\
    .save()

In [0]:
consumer.groupby('year_created').agg(count_distinct('record_id').alias('annual_records')).orderBy(1).display()

Databricks visualization. Run in Databricks to view.

In [0]:
consumer.groupby('reactions').agg(count_distinct('record_id').\
    alias('reaction_counts')).orderBy(2,ascending=False).limit(10).display()

Databricks visualization. Run in Databricks to view.

In [0]:
consumer.groupby('industry_name').agg(count_distinct('record_id').\
    alias('industry_counts')).orderBy(2,ascending=False).limit(10).display()

Databricks visualization. Run in Databricks to view.

In [0]:
consumer.groupby('name_brand','outcomes').agg(count_distinct('record_id').\
    alias('brand_outcome_count')).orderBy(3,ascending=False).limit(10).display()