# Setup PySpark

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
import pyspark.sql.functions as F
import os
import pandas as pd
import random
import hashlib
import itertools

from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
#os.environ['PYSPARK_SUBMIT_ARGS'] = \
#  '--org.postgresql.Driver /Users/tbiytc/Desktop/LifeZ_Data/postgresql-42.4.1.jar pyspark-shell'
#sc = SparkContext()
conf = SparkConf()  # create the configuration
conf.setMaster("local")
conf.set("spark.jars", "./postgresql-42.4.1.jar")
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark = SparkSession.builder\
                    .config(conf = conf)\
                    .appName('test').getOrCreate()
sc = SparkContext.getOrCreate()

22/08/31 14:12:03 WARN Utils: Your hostname, Jasons-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.2.47 instead (on interface en0)
22/08/31 14:12:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/08/31 14:12:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark

---

## Names & Entities Definition

In [1]:
# Set up the database names and accesses
db_name = "test"
db_usrname = "username"
db_pssword = "password"

## Demo Creating and Loading Spark Table

In [3]:
data = [("English", "1000000"), ("Chinese", "10000000000")]
columns = ["language", "users_count"]
df = spark.createDataFrame(data).toDF(*columns)
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------+-----------+
|language|users_count|
+--------+-----------+
| English|    1000000|
| Chinese|10000000000|
+--------+-----------+



                                                                                

In [6]:
df.write.format('jdbc').options(
  url=f'jdbc:postgresql://localhost:5432/{db_name}',
  driver='org.postgresql.Driver',
  dbtable='test_table',
  user=db_usrname,
  password=db_pssword).mode('append').save()

                                                                                

---

# Data Simulation for Coursera Interview Practice

## Data Creation

### CUSTOMERS

In [10]:
cust_cols = ['customer_id', 'customer_first_name', 'customer_last_name', 'customer_email', 'customer_region']

In [11]:
# first_name, last_name, email - 2000
cust_name = pd.read_csv("customer_names_emails.tsv", sep="\t")

In [39]:
# Augmentation
# region
regions = ['Canada', 'USA', 'Mexico']
cust_name['customer_region'] = random.choices(regions, k=len(cust_name))
# id
cust_name['customer_id'] = ''
for i in range(len(cust_name)):
    cust_name['customer_id'].loc[i] = hashlib.md5(cust_name['customer_email'].loc[i].encode()).hexdigest()

In [40]:
cust_name.head()

Unnamed: 0,customer_first_name,customer_last_name,customer_email,customer_region,customer_id
0,Wanda,Butler,wanda.butler@gmail.com,USA,af57bdefc43ed1a97aa1a669e537250d
1,Chloe,Edmunds,chloe.edmunds@gmail.com,Mexico,a628dca7a8e505f448b8801077486593
2,Tim,Dyer,tim.dyer@gmail.com,Canada,33c344a6bf09299998dc3634df4ec292
3,Diana,Lambert,diana.lambert@gmail.com,USA,61bacbc3071913a43adc4f6a72c0f387
4,Neil,North,neil.north@gmail.com,Canada,a72e8d3eca0cafd7ae2ad15f49233c52


### COURSES

In [113]:
course_cols = ['course_id', 'course_title', 'course_creator', 'course_description', 'course_segments', 'course_specialization']

In [114]:
courses = pd.read_csv("courses.tsv", sep="\t")

In [115]:
courses

Unnamed: 0,course_title,course_creator,course_description,course_segments
0,Introduction to Machine Learning,Andrew Ng,A comprehensive introduction course to the top...,10
1,Evolutionary Biology,Charles Darwin,"The gradual introduction to the concept of ""Su...",8
2,Time Travel,Christopher Nolan,How modern cinema intertwines with valid physi...,9
3,KPop Culture and Gen-Z Career Choices,BTS,How k-pop culture is intergrated with gen-z's ...,4


In [116]:
# Augmentation
# specialization
specs = ['Machine Learning', 'Modern Culture', 'Biology & Physics']
courses['course_specialization'] = ['Machine Learning', 'Biophysics', 'Biophysics', 'Society & Culture']
# course id
courses['course_id'] = ''
for i in range(len(courses)):
    courses['course_id'].loc[i] = hashlib.md5(f"{courses['course_title'].loc[i]}_{courses['course_creator'].loc[i]}".encode()).hexdigest()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  courses['course_id'].loc[i] = hashlib.md5(f"{courses['course_title'].loc[i]}_{courses['course_creator'].loc[i]}".encode()).hexdigest()


In [117]:
courses.head()

Unnamed: 0,course_title,course_creator,course_description,course_segments,course_specialization,course_id
0,Introduction to Machine Learning,Andrew Ng,A comprehensive introduction course to the top...,10,Machine Learning,498e77f7864d53426ff4f60247bfac0a
1,Evolutionary Biology,Charles Darwin,"The gradual introduction to the concept of ""Su...",8,Biophysics,9f6dec2eba4af6a4edaf035d228d7909
2,Time Travel,Christopher Nolan,How modern cinema intertwines with valid physi...,9,Biophysics,31b66379aed0c402de873e0bc089a1b1
3,KPop Culture and Gen-Z Career Choices,BTS,How k-pop culture is intergrated with gen-z's ...,4,Society & Culture,77c3c2e3c364464e7bf930cc4d30fa97


### COURSE_STUDENT_AGGREGATION

In [58]:
agg_cols = ['student_id', 'course_id', 'enrolment_tmstmp']

In [82]:
# Build dataframe
df_agg = pd.DataFrame()

In [83]:
from random import randrange
from datetime import timedelta

def random_date(start, end):
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return start + timedelta(seconds=random_second)

from datetime import datetime
d1 = datetime.strptime('1/1/2018 12:00 AM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2022 12:00 AM', '%m/%d/%Y %I:%M %p')

In [95]:
# Merge things in a way that makes sense
# student_id x course_id combinations
combos = list(itertools.product(list(set(cust_name['customer_id'])), list(set(courses['course_id']))))
agg_selections = random.sample(combos, 5000)

In [96]:
# Separate the columns
student_ids = [x[0] for x in agg_selections]
course_ids = [x[1] for x in agg_selections]

In [99]:
df_agg['student_id'] = student_ids
df_agg['course_id'] = course_ids

In [124]:
# Augmentation
# Timestamp
df_agg['enrolment_tmstmp'] = ''
for i in range(len(df_agg)):
    df_agg['enrolment_tmstmp'].loc[i] = random_date(d1, d2)
# student_course_id
df_agg['student_course_id'] = ''
for i in range(len(df_agg)):
    df_agg['student_course_id'].loc[i] = hashlib.md5(f"{df_agg['student_id'].loc[i]}_{df_agg['course_id'].loc[i]}".encode()).hexdigest()

In [125]:
df_agg.head()

Unnamed: 0,student_id,course_id,enrolment_tmstmp,student_course_id
0,64b2bca78b3f5368413aa096fe092f05,498e77f7864d53426ff4f60247bfac0a,2020-10-09 15:11:11,cf38f7cf6c0fd04c48620bc409cee6ee
1,e95e045dc23014e4c0d54198ef93cd32,9f6dec2eba4af6a4edaf035d228d7909,2020-05-20 09:01:12,d5ba114554d1fb7cd71496ab280ebff7
2,5c072518cdfbad9ed6883bca153790a5,9f6dec2eba4af6a4edaf035d228d7909,2020-12-02 22:33:14,70fd4affaf8ab905e3e55be2cc8b8a7c
3,a19cec46e542e2c28023f6e11481fdef,31b66379aed0c402de873e0bc089a1b1,2018-06-01 19:22:21,5efffc6c39bd9b422dbdf124518fe3cd
4,b9997d085b2df603bacfe2cacaf5b18b,77c3c2e3c364464e7bf930cc4d30fa97,2019-07-25 17:34:21,ee642839a47385f423b7106d2f84dbf1


## Convert Into Spark Tables

In [105]:
# CUSTOMER - cust_name
spark_cust_df = spark.createDataFrame(cust_name)

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [118]:
# COURSES - courses
spark_cors_df = spark.createDataFrame(courses)

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [126]:
# AGG - df_agg
spark_dagg_df = spark.createDataFrame(df_agg)

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


## Data Ingestion

In [104]:
import pandas as pd
import os

def write_to_db(data_frame, table_name):
    data_frame.write.format('jdbc').options(
              url=f'jdbc:postgresql://localhost:5432/{db_name}',
              driver='org.postgresql.Driver',
              dbtable=table_name,
              user=db_usrname,
              password=db_pssword).mode('append').save()
    out_message = f"Data frame {data_frame} has been appended to table {table_name} in the PostgreSQL database."
    return out_message

In [110]:
write_to_db(spark_cust_df, 'customers')

'Data frame DataFrame[customer_first_name: string, customer_last_name: string, customer_email: string, customer_region: string, customer_id: string] has been appended to table customers in the PostgreSQL database.'

In [120]:
write_to_db(spark_cors_df, 'courses')

'Data frame DataFrame[course_title: string, course_creator: string, course_description: string, course_segments: bigint, course_specialization: string, course_id: string] has been appended to table courses in the PostgreSQL database.'

In [127]:
write_to_db(spark_dagg_df, 'course_student_agg')

                                                                                

'Data frame DataFrame[student_id: string, course_id: string, enrolment_tmstmp: timestamp, student_course_id: string] has been appended to table course_student_agg in the PostgreSQL database.'

---