
# Data Processing [Business Logic] using PySpark & Spark SQL

In [0]:
# lazy evaluation


> *https://spark.apache.org/docs/latest/api/python/index.html*  
> *https://delta.io*


<br>
  
<img width="1200px" src ='https://owshqblobstg.blob.core.windows.net/stgfiles/png_files/dl_delta_1.png'>
  
<br>


<br>
  
<img width="1200px" src ='https://owshqblobstg.blob.core.windows.net/stgfiles/png_files/bronze_to_spark0.png'>
  
<br>

In [0]:
# [blob storage] = 1
# device

# [s3 ~ minio] = 1
# movies

# [mssql] = 1
# subscription

# [mongodb] = 1
# user

# [rest api] = 1
# ssn

# total = 5

In [0]:
%sql

-- creating a database for logical organization
CREATE DATABASE IF NOT EXISTS OwsHQ;
USE OwsHQ


### *Users*
> *user & ssn*

In [0]:
# users
# reading from delta bronze table
# file resides on the data lake
ds_user = spark.read.format("delta").load("dbfs:/mnt/owshq/delta/batch/bronze/user")
ds_ssn = spark.read.format("delta").load("dbfs:/mnt/owshq/delta/batch/bronze/ssn")

In [0]:
# select the columns to be used for data processing
# grouping user and ssn into the same [domain]
select_columns_users = ds_user.select("user_id","uid","id","first_name","last_name","date_of_birth","gender","employment","dt_current_timestamp")
select_columns_ssn = ds_ssn.select("id", "valid_us_ssn")

In [0]:
# display dataframe
# show data = action
display(select_columns_users)

user_id,uid,id,first_name,last_name,date_of_birth,gender,employment,dt_current_timestamp
3878,c6184a4f-3de3-4109-ae3d-f2652961201a,1262,Maximo,Jacobs,1988-03-12,Male,"List(Legacy Architect, Teamwork)",2021-10-26T14:18:36.032+0000
294,d103fbc6-3f4e-43e1-8a5c-07341122b7ec,2139,Carolin,Mitchell,1997-06-30,Male,"List(Chief Agent, Teamwork)",2021-10-26T14:18:36.032+0000
9384,a708e120-a62c-4776-b4f5-da5459544166,3679,Lurlene,Zemlak,1972-01-23,Agender,"List(Investor Analyst, Proactive)",2021-10-26T14:18:36.032+0000
5074,83312e46-75db-490a-882a-2a2c7aeae24e,3618,Lindy,Hirthe,2001-05-01,Genderqueer,"List(Legacy Technology Agent, Communication)",2021-10-26T14:18:36.032+0000
289,bf0f59df-b2e5-4a8f-80a9-e90589e5ee99,9265,Deena,Willms,1961-09-14,Male,"List(District Healthcare Coordinator, Technical savvy)",2021-10-26T14:18:36.032+0000
3054,819bc50d-35e9-452a-bf69-74e9f28d07af,326,Jenise,Weissnat,1982-05-01,Bigender,"List(Administration Developer, Work under pressure)",2021-10-26T14:18:36.032+0000
5611,a98a3daa-d6b0-4e7c-af49-bb3284e09bdf,5538,Liz,Krajcik,1962-01-27,Genderqueer,"List(Administration Agent, Leadership)",2021-10-26T14:18:36.032+0000
8627,ff1b35ff-8d2b-41bf-a990-eafcee4a0004,5822,Randall,Connelly,1991-06-17,Bigender,"List(Human Sales Executive, Teamwork)",2021-10-26T14:18:36.032+0000
7458,fb4a091c-f366-47db-95ca-2411c9196be1,3485,Edwardo,King,1970-01-11,Polygender,"List(Construction Associate, Leadership)",2021-10-26T14:18:36.032+0000
6565,2f0b755f-324b-4ad0-8a35-57528b9acf0e,144,Lloyd,Robel,1974-06-24,Genderqueer,"List(Senior Hospitality Orchestrator, Confidence)",2021-10-26T14:18:36.032+0000


In [0]:
# display dataframe
display(select_columns_ssn)

id,valid_us_ssn
8295,690-14-8534
2691,218-61-4373
6820,160-24-0370
9302,231-13-0363
5653,148-31-0875
4038,727-78-9808
8170,447-81-3459
4301,862-86-0066
7624,637-94-5637
3377,779-07-5974


In [0]:
# import libraries to perform data augmentation
from pyspark.sql.functions import col, concat_ws
from pyspark.sql.types import DateType, IntegerType

# perform data munging
# selecting columns
# cast data
enhance_column_selection_user = select_columns_users.select(
  col("uid").alias("user_uid"),
  col("id").alias("user_id"),
  concat_ws(" ", col("first_name"), col("last_name")).alias("user_name"),
  col("date_of_birth").cast(DateType()).alias("user_birth_date"),
  col("gender").alias("user_gender"),
  col("employment.title").alias("user_title"), 
  col("employment.key_skill").alias("user_skills"),
  col("dt_current_timestamp").alias("user_dt_current_timestamp")
).distinct()

# based on user dataframe
# register as spark sql object
# display transformed and cleaned dataframe
enhance_column_selection_user.createOrReplaceTempView("vw_user")
display(enhance_column_selection_user)

user_uid,user_id,user_name,user_birth_date,user_gender,user_title,user_skills,user_dt_current_timestamp
3132ffdc-11b7-423c-b124-7a576d961cea,5204,Jody Harris,1984-09-19,Male,Lead Education Analyst,Self-motivated,2021-10-26T14:18:36.032+0000
48debc46-3ea3-4522-8281-ea0c3d4f34a5,5890,Elwood Kozey,1960-06-05,Polygender,IT Strategist,Work under pressure,2021-10-26T14:19:40.277+0000
da68aedf-6d49-4c8f-8a33-4828cc07d4a8,543,Dane Hagenes,1991-11-07,Genderqueer,Hospitality Architect,Confidence,2021-10-26T14:22:09.389+0000
4ba9e3ed-d6a8-4f1d-ac2a-b9f6c295204f,7377,Joe Tillman,1978-08-29,Non-binary,Corporate Associate,Teamwork,2021-10-26T14:22:56.385+0000
fead2eab-3434-467a-8ab3-145f8023a103,3996,Deena Gislason,1983-03-23,Male,Marketing Architect,Confidence,2021-10-26T14:23:19.986+0000
687dc233-5251-4c58-acec-24a8f46f2afa,4550,Asa Schroeder,1980-03-11,Agender,Central Officer,Leadership,2021-10-26T14:23:43.704+0000
5ea28a48-9364-4e17-8735-2972c971b7c4,5142,Shay Welch,1975-11-14,Genderqueer,Technology Executive,Leadership,2021-10-26T14:24:30.661+0000
38d05de8-c605-4b5d-bc46-fe4d53e4cc1e,9908,Brittni Nader,1982-07-11,Bigender,Global Real-Estate Representative,Leadership,2021-10-26T14:24:30.661+0000
af67905c-7ef1-41b3-bb30-d2c98dc346a9,2618,Floria Turner,1994-12-26,Non-binary,Product Executive,Teamwork,2021-10-26T14:26:33.193+0000
4ca842cb-4876-4a37-911e-af3be270c3c3,3773,Sadie Walker,1982-05-09,Polygender,Central Manager,Networking skills,2021-10-26T14:27:28.417+0000


In [0]:
# select important columns
# ssn can be masked to hide value
# social security number
enhance_column_selection_ssn = select_columns_ssn.select(
  col("id").alias("ssn_user_id"),
  col("valid_us_ssn").alias("ssn"),
).distinct()

# based on ssn dataframe
# register as spark sql object
# display transformed and cleaned dataframe
enhance_column_selection_ssn.createOrReplaceTempView("vw_ssn")
display(enhance_column_selection_ssn)

ssn_user_id,ssn
5474,044-88-8493
5586,610-68-3891
94,249-58-9335
7348,227-30-6886
8885,538-87-0136
2853,102-58-5284
6236,425-16-8149
2195,620-36-3994
6934,218-38-1000
9586,203-02-6979


In [0]:
# register latest user and ssn improvements 
# spark sql engine available for query
enhance_column_selection_user.registerTempTable("vw_enhanced_user")
enhance_column_selection_ssn.registerTempTable("vw_enhanced_ssn")

In [0]:
%sql

SELECT *
FROM vw_enhanced_user AS u
INNER JOIN vw_enhanced_ssn AS s 
ON u.user_id = s.ssn_user_id

user_uid,user_id,user_name,user_birth_date,user_gender,user_title,user_skills,user_dt_current_timestamp,ssn_user_id,ssn
fead2eab-3434-467a-8ab3-145f8023a103,3996,Deena Gislason,1983-03-23,Male,Marketing Architect,Confidence,2021-10-26T14:23:19.986+0000,3996,092-16-2314
e8fb774b-d777-449f-8f6f-787bf79cc560,5899,Saturnina Armstrong,1965-01-04,Polygender,Central Strategist,Work under pressure,2021-10-26T14:43:51.422+0000,5899,209-65-5178
1e3db346-fd1f-4fb7-b1fa-14a6a8b4dc0c,5636,Gilberto Hayes,1993-03-27,Agender,Mining Strategist,Leadership,2021-10-26T14:20:09.149+0000,5636,537-21-3981
387ef65d-a685-4322-bb2b-08bf4b664d4a,2839,Ilana Schroeder,1964-03-05,Female,Retail Associate,Teamwork,2021-10-26T14:26:09.544+0000,2839,805-94-4575
e611b5bf-42e8-470a-a66c-aad567c22e97,45,Mose Wolff,1997-08-17,Agender,District Real-Estate Coordinator,Proactive,2021-10-26T14:31:59.981+0000,45,726-59-4193
6eef317a-0e1f-4811-ac52-4b461fe22ce3,1001,Modesto Barton,1997-09-14,Male,Manufacturing Consultant,Technical savvy,2021-10-26T14:32:22.395+0000,1001,831-92-3979
6f7edcfc-7ed8-4f69-8e79-2629c919fa33,2032,Kallie Gorczany,1968-10-13,Female,Design Manager,Communication,2021-10-26T14:37:15.061+0000,2032,341-27-3254
a5c5cdf2-3368-44d2-9ae5-ba836fb44bee,4198,Karlyn Braun,1992-11-21,Non-binary,Human Sales Designer,Leadership,2021-10-26T14:54:25.960+0000,4198,237-62-2998
05596548-ff3d-4b9d-bcad-ab4e0c23722d,4424,Karl Will,1971-01-13,Bigender,Direct Community-Services Producer,Proactive,2021-10-26T14:24:55.077+0000,4424,342-39-1438
06a3db50-4ecc-4a82-9e8d-8223720b61b7,6335,Bernadette Haag,1979-08-15,Agender,Retail Strategist,Confidence,2021-10-26T14:22:09.389+0000,6335,256-45-1061



<br>
  
<img width="900px" src ='https://owshqblobstg.blob.core.windows.net/stgfiles/png_files/bronze_silver_tables0.png'>
  
<br>

In [0]:
# perform left outer join from user to ssn, process to group bronze tables 
# joins can be tricky, try to use spark 3.0 as much as you can due to the aqe [adaptive query execution] ~ adaptive query execution
# domains can be used, normally for large enterprises that has different users for e.g. grouping all the bronze users 4to a silver user makes sense
left_join_users = enhance_column_selection_user.join(enhance_column_selection_ssn, enhance_column_selection_user.user_id == enhance_column_selection_ssn.ssn_user_id, how='left')

# store the join of the bronze dataframes into a new silver table
# data is saved into a delta table however this file is stored on data lake
# silver table has the cleansed and enriched data
# appending data inside of silver table
left_join_users.write.format("delta").mode("append").save("dbfs:/mnt/owshq/delta/batch/silver/users")

# display data inside of dataframe
display(left_join_users)

user_uid,user_id,user_name,user_birth_date,user_gender,user_title,user_skills,user_dt_current_timestamp,ssn_user_id,ssn
3132ffdc-11b7-423c-b124-7a576d961cea,5204,Jody Harris,1984-09-19,Male,Lead Education Analyst,Self-motivated,2021-10-26T14:18:36.032+0000,,
48debc46-3ea3-4522-8281-ea0c3d4f34a5,5890,Elwood Kozey,1960-06-05,Polygender,IT Strategist,Work under pressure,2021-10-26T14:19:40.277+0000,,
da68aedf-6d49-4c8f-8a33-4828cc07d4a8,543,Dane Hagenes,1991-11-07,Genderqueer,Hospitality Architect,Confidence,2021-10-26T14:22:09.389+0000,,
4ba9e3ed-d6a8-4f1d-ac2a-b9f6c295204f,7377,Joe Tillman,1978-08-29,Non-binary,Corporate Associate,Teamwork,2021-10-26T14:22:56.385+0000,,
fead2eab-3434-467a-8ab3-145f8023a103,3996,Deena Gislason,1983-03-23,Male,Marketing Architect,Confidence,2021-10-26T14:23:19.986+0000,3996.0,092-16-2314
687dc233-5251-4c58-acec-24a8f46f2afa,4550,Asa Schroeder,1980-03-11,Agender,Central Officer,Leadership,2021-10-26T14:23:43.704+0000,,
5ea28a48-9364-4e17-8735-2972c971b7c4,5142,Shay Welch,1975-11-14,Genderqueer,Technology Executive,Leadership,2021-10-26T14:24:30.661+0000,,
38d05de8-c605-4b5d-bc46-fe4d53e4cc1e,9908,Brittni Nader,1982-07-11,Bigender,Global Real-Estate Representative,Leadership,2021-10-26T14:24:30.661+0000,,
af67905c-7ef1-41b3-bb30-d2c98dc346a9,2618,Floria Turner,1994-12-26,Non-binary,Product Executive,Teamwork,2021-10-26T14:26:33.193+0000,,
4ca842cb-4876-4a37-911e-af3be270c3c3,3773,Sadie Walker,1982-05-09,Polygender,Central Manager,Networking skills,2021-10-26T14:27:28.417+0000,,


In [0]:
%sql

-- query silver tables
-- understanding of the schema and data

SELECT *
FROM vw_enhanced_user AS user
LEFT OUTER JOIN vw_enhanced_ssn AS ssn
ON user.user_id = ssn.ssn_user_id

user_uid,user_id,user_name,user_birth_date,user_gender,user_title,user_skills,user_dt_current_timestamp,ssn_user_id,ssn
3132ffdc-11b7-423c-b124-7a576d961cea,5204,Jody Harris,1984-09-19,Male,Lead Education Analyst,Self-motivated,2021-10-26T14:18:36.032+0000,,
48debc46-3ea3-4522-8281-ea0c3d4f34a5,5890,Elwood Kozey,1960-06-05,Polygender,IT Strategist,Work under pressure,2021-10-26T14:19:40.277+0000,,
da68aedf-6d49-4c8f-8a33-4828cc07d4a8,543,Dane Hagenes,1991-11-07,Genderqueer,Hospitality Architect,Confidence,2021-10-26T14:22:09.389+0000,,
4ba9e3ed-d6a8-4f1d-ac2a-b9f6c295204f,7377,Joe Tillman,1978-08-29,Non-binary,Corporate Associate,Teamwork,2021-10-26T14:22:56.385+0000,,
fead2eab-3434-467a-8ab3-145f8023a103,3996,Deena Gislason,1983-03-23,Male,Marketing Architect,Confidence,2021-10-26T14:23:19.986+0000,3996.0,092-16-2314
687dc233-5251-4c58-acec-24a8f46f2afa,4550,Asa Schroeder,1980-03-11,Agender,Central Officer,Leadership,2021-10-26T14:23:43.704+0000,,
5ea28a48-9364-4e17-8735-2972c971b7c4,5142,Shay Welch,1975-11-14,Genderqueer,Technology Executive,Leadership,2021-10-26T14:24:30.661+0000,,
38d05de8-c605-4b5d-bc46-fe4d53e4cc1e,9908,Brittni Nader,1982-07-11,Bigender,Global Real-Estate Representative,Leadership,2021-10-26T14:24:30.661+0000,,
af67905c-7ef1-41b3-bb30-d2c98dc346a9,2618,Floria Turner,1994-12-26,Non-binary,Product Executive,Teamwork,2021-10-26T14:26:33.193+0000,,
4ca842cb-4876-4a37-911e-af3be270c3c3,3773,Sadie Walker,1982-05-09,Polygender,Central Manager,Networking skills,2021-10-26T14:27:28.417+0000,,


In [0]:
%sql

-- count rows
-- 21.200
-- 22.203

SELECT COUNT(*) AS registered_users_platform
FROM vw_enhanced_user AS user
LEFT OUTER JOIN vw_enhanced_ssn AS ssn
ON user.user_id = ssn.ssn_user_id

/*
SELECT user.user_id, COUNT(*) AS amount
FROM vw_enhanced_user AS user
LEFT OUTER JOIN vw_enhanced_ssn AS ssn
ON user.user_id = ssn.ssn_user_id
GROUP BY user.user_id
ORDER BY amount DESC

SELECT *
FROM vw_enhanced_user AS user
LEFT OUTER JOIN vw_enhanced_ssn AS ssn
ON user.user_id = ssn.ssn_user_id
WHERE user.user_id = 9953
*/

registered_users_platform
10013



### *Subscriptions*
> *subscription & device*

In [0]:
# subscription & device
# reading from bronze location
# physical data stored on storage system
ds_subscription = spark.read.format("delta").load("dbfs:/mnt/owshq/delta/batch/bronze/subscription")
ds_device = spark.read.format("delta").load("dbfs:/mnt/owshq/delta/batch/bronze/device")

# select columns for data processing
# using pyspark for selection
select_columns_subscription = ds_subscription.select("user_id","plan","status")
select_columns_device = ds_device.select("user_id","uid","model","manufacturer","platform")

In [0]:
# display dataframe in-memory
display(select_columns_subscription)

user_id,plan,status
163,Standard,Pending
1000,Standard,Idle
611,Silver,Active
289,Essential,Pending
580,Starter,Active
827,Business,Pending
159,Diamond,Active
985,Diamond,Active
399,Bronze,Idle
958,Free Trial,Pending


In [0]:
# display dataframe in-memory
display(select_columns_device)

user_id,uid,model,manufacturer,platform
1328,3e2cf22c-6363-4c0e-9888-8b259238d285,iPhone 5C,Xiamomi,Windows Phone
6640,1cc2cd57-ef40-4e0b-9c3c-bc5ff62dbaec,Huawei Mate 20,Dell,Firefox OS
1940,754c13c2-7b09-4c33-892e-fd74ffcd47a2,Xiaomi Mi 8 Pro,Lenovo,Ubuntu Touch
6489,2698f4c9-fd85-4009-80f7-867e39a75e45,iPhone 4S,HP,iOS
3234,2a5a5fb1-039d-4d43-ab69-a5c01d64ef77,Samsung Galaxy S8,OnePlus,Windows RT
3912,1018558b-93d0-4102-9f9f-d49ba2a17e34,iPhone 6S / 6S Plus,ASUS,Windows 10 Mobile
4604,ef199195-eb60-4412-8677-bcdb05531b2a,Samsung Galaxy S4,Huawei,Firefox OS
1213,4bd175e5-b299-4abc-9802-b6d00dfd691d,Google Pixel 3,Xiamomi,Windows 10 Mobile
9015,dbaabca3-0b7d-4e1c-b52c-ea66f4a0ea31,Huawei P20 Pro,Huawei,Android OS
3146,d6c8aed6-8ce9-47c1-9f10-cd5313247955,Xiaomi Redmi Note 6 Pro,HP,Danger OS


In [0]:
# use one of the most sexy ways to build functions
# create a python function using the business logic
# classification of users based on their subscription level
# write code in [scala]
def subscription_importance(subscription_plan):
  if subscription_plan in ("Business","Diamond","Gold","Platinum","Premium"):
    return "High"
  if subscription_plan in ("Bronze","Essential","Professional","Silver","Standard"):
    return "Normal"
  else:
    return "Low"

# register function into spark's engine to make it available
# once registered you can access in [any] language
spark.udf.register("fn_subscription_importance", subscription_importance)

# select columns of subscription
# use alias to save your upfront process ~ silver
# better name understanding for the business
enhance_column_selection_subscription = select_columns_subscription.select(
  col("user_id").alias("subscription_user_id"),
  col("plan").alias("subscription_plan"),
  col("status").alias("subscription_status"),
).distinct()

# register as a spark sql object
enhance_column_selection_subscription.registerTempTable("vw_subscription")

# build another way to create functions
# using spark sql engine capability to perform a case when
# save the sql into a dataframe
enhance_column_selection_subscription = spark.sql("""
SELECT subscription_user_id, 
       subscription_plan, 
       CASE WHEN subscription_plan = 'Basic' THEN 6.00 
            WHEN subscription_plan = 'Bronze' THEN 8.00 
            WHEN subscription_plan = 'Business' THEN 10.00 
            WHEN subscription_plan = 'Diamond' THEN 14.00
            WHEN subscription_plan = 'Essential' THEN 9.00 
            WHEN subscription_plan = 'Free Trial' THEN 0.00
            WHEN subscription_plan = 'Gold' THEN 25.00
            WHEN subscription_plan = 'Platinum' THEN 9.00
            WHEN subscription_plan = 'Premium' THEN 13.00
            WHEN subscription_plan = 'Professional' THEN 17.00
            WHEN subscription_plan = 'Silver' THEN 11.00
            WHEN subscription_plan = 'Standard' THEN 13.00
            WHEN subscription_plan = 'Starter' THEN 5.00
            WHEN subscription_plan = 'Student' THEN 2.00
       ELSE 0.00 END AS subscription_price,
       subscription_status,
       fn_subscription_importance(subscription_plan) AS subscription_importance
FROM vw_subscription
""")

In [0]:
%sql

-- use function created on pyspark into sql's engine
-- dataframe previously registered = vw_subscription (temp table)
-- query access at blazing fast speed

SELECT *, fn_subscription_importance(subscription_plan) AS importance
FROM vw_subscription

subscription_user_id,subscription_plan,subscription_status,importance
8727,Platinum,Idle,High
8891,Gold,Pending,High
8707,Free Trial,Blocked,Low
8567,Basic,Active,Low
8597,Student,Pending,Low
8832,Standard,Idle,Normal
8839,Premium,Idle,High
8607,Free Trial,Blocked,Low
8880,Essential,Idle,Normal
8715,Business,Active,High


In [0]:
# display enhanced data of subscription
display(enhance_column_selection_subscription)

subscription_user_id,subscription_plan,subscription_price,subscription_status,subscription_importance
686,Professional,17.0,Pending,Normal
111,Free Trial,0.0,Pending,Low
68,Student,2.0,Idle,Low
8,Diamond,14.0,Pending,High
266,Free Trial,0.0,Pending,Low
220,Diamond,14.0,Active,High
111,Gold,25.0,Idle,High
996,Essential,9.0,Pending,Normal
237,Essential,9.0,Idle,Normal
864,Platinum,9.0,Idle,High


In [0]:
# select columns to be used 
# bronze = raw
# same data coming from data lake
enhance_column_selection_device = select_columns_device.select(
  col("user_id").alias("device_user_id"),
  col("model").alias("device_model"),
).distinct()

# make it available into spark's sql engine
enhance_column_selection_device.registerTempTable("vw_device")
display(enhance_column_selection_device)

device_user_id,device_model
3719,Samsung Galaxy S4
6608,Google Pixel 2
3643,Samsung Galaxy S7 Edge
8988,Xiaomi Redmi Note 6 Pro
3592,iPhone X
3124,Huawei P10 Plus
9575,OnePlus One
3428,OnePlus 2
1851,iPhone 6 / 6 Plus
141,OnePlus 3T


In [0]:
# register latest user and ssn improvements 
# spark sql engine available for query
enhance_column_selection_subscription.registerTempTable("vw_enhanced_subscription")
enhance_column_selection_device.registerTempTable("vw_enhanced_device")


<br>
  
<img width="900px" src ='https://brzluanmoreno.blob.core.windows.net/stgfiles/png_files/bronze_silver_tables0.png'>
  
<br>

In [0]:
# perform inner join between the subscription and device
# figure out which devices are being used to watch movies
inner_join_subscriptions = enhance_column_selection_subscription.join(enhance_column_selection_device, enhance_column_selection_subscription.subscription_user_id == enhance_column_selection_device.device_user_id, how='inner')

# save data from dataframe to silver delta table
# don't forget, physical data is stored into storage system for e.g. blob storage, s3, gcs
inner_join_subscriptions.write.format("delta").mode("append").save("dbfs:/mnt/owshq/delta/batch/silver/subscriptions")
display(inner_join_subscriptions)

subscription_user_id,subscription_plan,subscription_price,subscription_status,subscription_importance,device_user_id,device_model
6608,Gold,25.0,Pending,High,6608,Google Pixel 2
9759,Essential,9.0,Idle,Normal,9759,Samsung Galaxy S4 Mini
6540,Free Trial,0.0,Blocked,Low,6540,Xiaomi Mi 8 Lite
8880,Essential,9.0,Idle,Normal,8880,Huawei P Smart
7987,Gold,25.0,Pending,High,7987,Huawei Mate 20 Pro
7297,Premium,13.0,Pending,High,7297,Samsung Galaxy S9
7432,Free Trial,0.0,Blocked,Low,7432,Google Pixel 2
719,Starter,5.0,Blocked,Low,719,Huawei P10 Plus
6605,Free Trial,0.0,Blocked,Low,6605,Google Pixel 2
1341,Professional,17.0,Idle,Normal,1341,Huawei P Smart


In [0]:
%sql

-- 2.925
-- 5.038

SELECT COUNT(*)
FROM vw_enhanced_subscription AS subscription
INNER JOIN vw_enhanced_device AS device
ON subscription.subscription_user_id = device.device_user_id

count(1)
545


In [0]:
%sql

SELECT *
FROM vw_enhanced_subscription AS subscription
INNER JOIN vw_enhanced_device AS device
ON subscription.subscription_user_id = device.device_user_id

subscription_user_id,subscription_plan,subscription_price,subscription_status,subscription_importance,device_user_id,device_model
9340,Free Trial,0.0,Blocked,Low,9340,Samsung Galaxy S3
9340,Premium,13.0,Pending,High,9340,Samsung Galaxy S3
9340,Diamond,14.0,Pending,High,9340,Samsung Galaxy S3
9340,Diamond,14.0,Idle,High,9340,Samsung Galaxy S3
9340,Silver,11.0,Idle,Normal,9340,Samsung Galaxy S3
9684,Essential,9.0,Active,Normal,9684,Huawei Mate 9
9684,Silver,11.0,Active,Normal,9684,Huawei Mate 9
9684,Platinum,9.0,Pending,High,9684,Huawei Mate 9
9684,Business,10.0,Idle,High,9684,Huawei Mate 9
9684,Bronze,8.0,Idle,Normal,9684,Huawei Mate 9



### *Movies*
> *movies and ratings*

In [0]:
# movies
# data coming from storage system
# delta format = bronze
ds_movies = spark.read.format("delta").load("dbfs:/mnt/owshq/delta/batch/bronze/movies")

In [0]:
# display dataframe data
display(ds_movies)

id,imdb_id,user_id,title,genres,status,release_date,original_language,popularity,production_companies,production_countries,vote_count,rating,dt_current_timestamp
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,5.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,3.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,4.5,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,3.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,5.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,4.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,3.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,3.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,4.0,1635245045917
22969,tt0028872,262,Fire Over England,"[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}, {'id': 12, 'name': 'Adventure'}, {'id': 36, 'name': 'History'}, {'id': 10749, 'name': 'Romance'}]",Released,1937-02-25,en,1.722841,"[{'name': 'London Film Productions', 'id': 659}]","[{'iso_3166_1': 'GB', 'name': 'United Kingdom'}]",7.0,4.5,1635245045917


In [0]:
# import libraries 
# select needed columns or data augmentation
from pyspark.sql.functions import col, expr, when
select_columns_movies = ds_movies.select("user_id","title","status","original_language","rating","dt_current_timestamp")

# enhance columns for better data understanding
# building expr as another way to impose transformations
# craft case when based on ratings
enhance_column_selection_movies = select_columns_movies.select(
  col("user_id").alias("movie_user_id"),
  col("title").alias("movie_title"),
  col("original_language").alias("movie_language"),
  expr("CASE WHEN original_language = 'ja' THEN 'Japanese' " + "WHEN original_language = 'en' THEN 'English' WHEN original_language = 'it' THEN 'Italian' WHEN original_language = 'fr' THEN 'French' ''" + "ELSE 'Unknow' END").alias("movie_language_desc"),
  col("rating").alias("movie_rating"),
  (when(col("rating").between(1,3),"rotten")).when(col("rating").between(4,7),"fresh").otherwise("unclassified").alias("rotten_tomatoes_rating"),
  col("dt_current_timestamp").alias("movie_dt_current_timestamp")
)

# display data
display(enhance_column_selection_movies)

movie_user_id,movie_title,movie_language,movie_language_desc,movie_rating,rotten_tomatoes_rating,movie_dt_current_timestamp
848,The Goddess,zh,Unknow,5.0,fresh,1648567855949
848,The Goddess,zh,Unknow,4.5,fresh,1648567855949
848,The Goddess,zh,Unknow,2.5,rotten,1648567855949
848,The Goddess,zh,Unknow,3.5,unclassified,1648567855949
848,The Goddess,zh,Unknow,4.0,fresh,1648567855949
848,The Goddess,zh,Unknow,2.0,rotten,1648567855949
797,Lost in Translation,en,English,3.0,rotten,1648567855949
797,Lost in Translation,en,English,2.0,rotten,1648567855949
797,Lost in Translation,en,English,3.0,rotten,1648567855949
797,Lost in Translation,en,English,5.0,fresh,1648567855949



<br>
  
<img width="900px" src ='https://brzluanmoreno.blob.core.windows.net/stgfiles/png_files/bronze_silver_tables0.png'>
  
<br>

In [0]:
# store from dataframe to silver table
# write as append mode operation into delta format
# data is stored on data lake system
enhance_column_selection_movies.write.format("delta").mode("append").save("dbfs:/mnt/owshq/delta/batch/silver/movies")


# Read Silver Delta Tables [Domains]

> *Users*  
> *Subscriptions*  
> *Movies*

In [0]:
%sql

SELECT *
FROM delta.`dbfs:/mnt/owshq/delta/batch/silver/users`

user_uid,user_id,user_name,user_birth_date,user_gender,user_title,user_skills,user_dt_current_timestamp,ssn_user_id,ssn
2687a004-a969-4ec1-a9c1-4e5d1fdb4870,247,Bethel Kris,1986-04-24,Bigender,Customer Engineer,Fast learner,2021-04-12T19:25:43.776+0000,,
520ad1b3-8dfd-4893-8377-8cf1dfaa364c,6733,Emily Windler,1958-07-18,Genderqueer,Banking Architect,Proactive,2021-04-13T02:26:02.198+0000,,
925642dd-e471-44de-b0fa-493d50da034d,8824,Benton Predovic,1993-05-31,Polygender,Customer Technology Architect,Proactive,2021-04-13T04:39:32.693+0000,,
ab193d4a-ead4-45aa-bf16-b00e62ea419e,9307,Rickie Mertz,1982-04-07,Genderqueer,Internal Administration Specialist,Communication,2021-04-13T05:00:50.706+0000,,
df2b9cef-fc48-4d7b-82c6-5747afdf994d,6883,Phoebe Brekke,1996-06-02,Non-binary,Mining Planner,Teamwork,2021-04-13T08:02:03.821+0000,,
578381c1-f821-4966-80de-4bbb5c397601,1845,Florinda Halvorson,2002-10-07,Male,Customer Community-Services Administrator,Work under pressure,2021-04-13T08:02:27.269+0000,,
981dbf9c-27d7-4f27-9e7b-db1bdc6b2f2f,8590,Jeremiah Crooks,1963-04-03,Genderqueer,Global Consultant,Technical savvy,2021-04-13T08:10:23.760+0000,,
41049d88-42aa-4515-921c-830055ab56f4,6871,Ryan Jones,1986-06-27,Polygender,Forward Architect,Fast learner,2021-04-13T08:20:21.816+0000,,
8ddbd468-6c07-4499-9fb5-8a042556d65d,6676,Evelin Bernhard,1967-03-23,Genderqueer,Marketing Planner,Work under pressure,2021-04-13T08:20:21.816+0000,,
66f50833-c210-4e4a-846b-181dfd19c8bb,7657,Claude Hauck,1968-07-22,Female,Sales Supervisor,Self-motivated,2021-04-13T09:14:16.157+0000,,


In [0]:
%sql

SELECT *
FROM delta.`dbfs:/mnt/owshq/delta/batch/silver/subscriptions`

subscription_user_id,subscription_plan,subscription_price,subscription_status,subscription_importance,device_user_id,device_model
8815,Basic,6.0,Idle,Low,8815,Google Pixel 3 XL
5558,Standard,13.0,Blocked,Normal,5558,OnePlus 3
5716,Starter,5.0,Blocked,Low,5716,Samsung Galaxy S8 Plus
5049,Bronze,8.0,Pending,Normal,5049,iPhone 5S
5507,Starter,5.0,Active,Low,5507,Samsung Galaxy S6 Edge
8889,Gold,25.0,Blocked,High,8889,Samsung Galaxy S7 Edge
5344,Starter,5.0,Active,Low,5344,Samsung Galaxy S1
8786,Professional,17.0,Pending,Normal,8786,Huawei P20
8250,Essential,9.0,Active,Normal,8250,iPhone 3G
5410,Starter,5.0,Blocked,Low,5410,Google Pixel 2


In [0]:
%sql

SELECT *
FROM delta.`dbfs:/mnt/owshq/delta/batch/silver/movies`

movie_user_id,movie_title,movie_language,movie_language_desc,movie_rating,rotten_tomatoes_rating,movie_dt_current_timestamp
6097,Act Like You Love Me,en,English,3.0,rotten,1621342121812
7009,Suspect,en,English,3.5,unclassified,1621342121812
7009,Suspect,en,English,2.0,rotten,1621342121812
5104,Policewomen,en,English,5.0,fresh,1621342121812
7948,Mad Max 2: The Road Warrior,en,English,4.5,fresh,1621342121812
9510,The Nutcracker: The Untold Story,en,English,1.0,rotten,1621342121812
2196,Heavy Petting,en,English,2.0,rotten,1621342121812
1747,Cure,ja,Japanese,4.5,fresh,1621342121812
2550,Gimme Shelter,en,English,4.0,fresh,1621342121812
9626,Hidden,en,English,3.0,rotten,1621342121812
