### Import pypark and other libraries

In [44]:
# findspark is a good tool to load pyspark in notebook
import findspark 
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from datetime import datetime
import sys
from datetime import timedelta
import pandas as pd
import pytz

### Init spark session and define global variables

In [45]:
# function to init a spark session
def init_spark_session(app_name):
    spark_session = SparkSession.builder.appName(app_name).getOrCreate()
    return spark_session

In [46]:
# init spark session
app_name = 'pyspark example of basic functions'
spark_session = init_spark_session(app_name)

In [47]:
# define the base uri of hadoop file system
hdfs_base_uri = 'hdfs://node-master:9000//user/hadoop/spark_examples'

In [48]:
# define user, transaction, user transaction data folder (csv and parquet)
user_parq_folder = 'user_parq'
user_csv_folder = 'user_csv'
transaction_parq_folder = 'transaction_parq'
transaction_csv_folder = 'transaction_csv'
user_transaction_parq_folder = 'user_transaction_parq'
user_transaction_csv_folder = 'user_transaction_csv'

### Create fake table and data

In [49]:
# user table fields: 'id', 'email', 'first_name', 'last_name', 'created_date', 'modified_date'
# generate 100 users
user_schema = ['id', 'email', 'first_name', 'last_name', 'date_of_birth', 'created_date', 'modified_date']
user_data = [(i, 'test_' + str(i) + '@gmail.com', 'fname_' + str(i%10+1), 'lname_' + str(i%10+1), datetime(1960+i%20, i%12+1, i%25+1),
              datetime(2019, 8, i%30+1), datetime(2019, 9, i%30+1)) for i in range(1,101)]

In [50]:
# transaction table fields: 'id', 'amount', 'created_date'
# generate 1000 transactions
transaction_schema = ['id', 'amount', 'created_date']
transaction_data = [(i, i%10+1, datetime(2019, 9, i%30+1)) for i in range(1,1001)]

In [51]:
# user transaction relationship table fields: 'user_id', 'transaction_id'
user_transaction_schema = ['user_id', 'transaction_id']
user_transaction_data = [(i%100+1, i+1) for i in range(1000)]

### Create pyspark Dataframe from fake data

In [52]:
df_user = spark_session.createDataFrame(user_data, user_schema)
df_transaction = spark_session.createDataFrame(transaction_data, transaction_schema)
df_user_transaction = spark_session.createDataFrame(user_transaction_data, user_transaction_schema)

In [53]:
# show user dataframe
df_user.show()

+---+-----------------+----------+---------+-------------------+-------------------+-------------------+
| id|            email|first_name|last_name|      date_of_birth|       created_date|      modified_date|
+---+-----------------+----------+---------+-------------------+-------------------+-------------------+
|  1| test_1@gmail.com|   fname_2|  lname_2|1961-02-02 00:00:00|2019-08-02 00:00:00|2019-09-02 00:00:00|
|  2| test_2@gmail.com|   fname_3|  lname_3|1962-03-03 00:00:00|2019-08-03 00:00:00|2019-09-03 00:00:00|
|  3| test_3@gmail.com|   fname_4|  lname_4|1963-04-04 00:00:00|2019-08-04 00:00:00|2019-09-04 00:00:00|
|  4| test_4@gmail.com|   fname_5|  lname_5|1964-05-05 00:00:00|2019-08-05 00:00:00|2019-09-05 00:00:00|
|  5| test_5@gmail.com|   fname_6|  lname_6|1965-06-06 00:00:00|2019-08-06 00:00:00|2019-09-06 00:00:00|
|  6| test_6@gmail.com|   fname_7|  lname_7|1966-07-07 00:00:00|2019-08-07 00:00:00|2019-09-07 00:00:00|
|  7| test_7@gmail.com|   fname_8|  lname_8|1967-08-08 

In [54]:
# show transaction dataframe
df_transaction.show()

+---+------+-------------------+
| id|amount|       created_date|
+---+------+-------------------+
|  1|     2|2019-09-02 00:00:00|
|  2|     3|2019-09-03 00:00:00|
|  3|     4|2019-09-04 00:00:00|
|  4|     5|2019-09-05 00:00:00|
|  5|     6|2019-09-06 00:00:00|
|  6|     7|2019-09-07 00:00:00|
|  7|     8|2019-09-08 00:00:00|
|  8|     9|2019-09-09 00:00:00|
|  9|    10|2019-09-10 00:00:00|
| 10|     1|2019-09-11 00:00:00|
| 11|     2|2019-09-12 00:00:00|
| 12|     3|2019-09-13 00:00:00|
| 13|     4|2019-09-14 00:00:00|
| 14|     5|2019-09-15 00:00:00|
| 15|     6|2019-09-16 00:00:00|
| 16|     7|2019-09-17 00:00:00|
| 17|     8|2019-09-18 00:00:00|
| 18|     9|2019-09-19 00:00:00|
| 19|    10|2019-09-20 00:00:00|
| 20|     1|2019-09-21 00:00:00|
+---+------+-------------------+
only showing top 20 rows



In [55]:
# show user transaction dataframe
df_user_transaction.show()

+-------+--------------+
|user_id|transaction_id|
+-------+--------------+
|      1|             1|
|      2|             2|
|      3|             3|
|      4|             4|
|      5|             5|
|      6|             6|
|      7|             7|
|      8|             8|
|      9|             9|
|     10|            10|
|     11|            11|
|     12|            12|
|     13|            13|
|     14|            14|
|     15|            15|
|     16|            16|
|     17|            17|
|     18|            18|
|     19|            19|
|     20|            20|
+-------+--------------+
only showing top 20 rows



### Save pyspark dataframe as parquet file

In [56]:
# define function of saving pyspark dataframe
def save_pyspark_dataframe(df_data, folder, format='csv'):
    saved_folder = hdfs_base_uri + '/' + folder
    if format == 'csv':
        df_data.write.csv(saved_folder, header=True)
    elif format == 'parquet':
        df_data.write.parquet(saved_folder)
    return saved_folder

In [59]:
# command to delete parquet and csv files 
'''
hdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_parq
hdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_csv
hdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/transaction_parq
hdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/transaction_csv
hdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_transaction_parq
hdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_transaction_csv
'''

'\nhdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_parq\nhdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_csv\nhdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/transaction_parq\nhdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/transaction_csv\nhdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_transaction_parq\nhdfs dfs -rm -r hdfs://node-master:9000/user/hadoop/spark_examples/user_transaction_csv\n'

In [60]:
# save user, transaction and user transaction dataframes as parquet files
save_pyspark_dataframe(df_user, user_parq_folder, 'parquet')
save_pyspark_dataframe(df_transaction, transaction_parq_folder, 'parquet')
save_pyspark_dataframe(df_user_transaction, user_transaction_parq_folder, 'parquet')

'hdfs://node-master:9000//user/hadoop/spark_examples/user_transaction_parq'

In [61]:
# save user, transaction and user transaction dataframes as csv files
save_pyspark_dataframe(df_user, user_csv_folder, 'csv')
save_pyspark_dataframe(df_transaction, transaction_csv_folder, 'csv')
save_pyspark_dataframe(df_user_transaction, user_transaction_csv_folder, 'csv')

'hdfs://node-master:9000//user/hadoop/spark_examples/user_transaction_csv'

### Load pyspark dataframes from parquet or csv files and register as temporary views/tables

In [62]:
# load a parquet folder as a table/view
def load_table_from_parquet_file(table_name, parquet_path):
    loaded = False
    try:
        lc_parquet_file = spark_session.read.option("mergeSchema", "true").parquet(parquet_path)
        lc_parquet_file.createOrReplaceTempView(table_name)
        loaded = True
    except:
        pass
    return loaded

In [63]:
# load a csv folder as a table/view
def load_table_from_csv_file(table_name, csv_path):
    loaded = False
    try:
        lc_csv_file = spark_session.read.option("mergeSchema", "true").csv(csv_path, header=True)
        lc_csv_file.createOrReplaceTempView(table_name)
        loaded = True
    except:
        pass
    return loaded

In [64]:
# load user data from parquet file with the table name of user
load_table_from_parquet_file('user', hdfs_base_uri + '/' + user_parq_folder)

True

In [65]:
# load transaction data from csv file with the table name of transaction
load_table_from_csv_file('transaction', hdfs_base_uri + '/' + transaction_csv_folder)

True

In [66]:
# load user transaction data from parquet file with the table name of user_transaction
load_table_from_parquet_file('user_transaction', hdfs_base_uri + '/' + user_transaction_parq_folder)

True

### Use pyspak sql queries to gather and manipulate data
**join user, transaction by using user_transaction table and find the latest transaction**

In [67]:
# function to register pyspark dataframe as a table/view
def register_dataframe_as_table(df_data, table_name):
    loaded = False
    try:
        df_data.createOrReplaceTempView(table_name)
        loaded = True
    except:
        pass
    return loaded

In [68]:
# sql to join user and transaction through user_transaction and sort user transaction by created_date of transaction
# so the latest transacton will be on top
sql = "select u.id u_id,t.id t_id, t.created_date" \
        " from user_transaction ut " \
        " left join user u on ut.user_id=u.id" \
        " left join transaction t on ut.transaction_id=t.id " \
        " order by t.created_date desc " 
sql

'select u.id u_id,t.id t_id, t.created_date from user_transaction ut  left join user u on ut.user_id=u.id left join transaction t on ut.transaction_id=t.id  order by t.created_date desc '

In [69]:
df_user_transaction_sorted = spark_session.sql(sql)

In [70]:
# show sorted user transaction
df_user_transaction_sorted.show()

+----+----+--------------------+
|u_id|t_id|        created_date|
+----+----+--------------------+
|  79| 179|2019-09-30T00:00:...|
|  49| 149|2019-09-30T00:00:...|
|  69| 569|2019-09-30T00:00:...|
|  99| 599|2019-09-30T00:00:...|
|  29|  29|2019-09-30T00:00:...|
|  29| 629|2019-09-30T00:00:...|
|  59| 659|2019-09-30T00:00:...|
|  89| 689|2019-09-30T00:00:...|
|  19| 719|2019-09-30T00:00:...|
|  39| 539|2019-09-30T00:00:...|
|   9| 809|2019-09-30T00:00:...|
|  39| 839|2019-09-30T00:00:...|
|  69| 869|2019-09-30T00:00:...|
|  99| 899|2019-09-30T00:00:...|
|  29| 929|2019-09-30T00:00:...|
|  59| 959|2019-09-30T00:00:...|
|  89| 989|2019-09-30T00:00:...|
|  49| 749|2019-09-30T00:00:...|
|   9| 509|2019-09-30T00:00:...|
|  79| 779|2019-09-30T00:00:...|
+----+----+--------------------+
only showing top 20 rows



In [71]:
# register the dataframe of the sorted user transaction as a table: user_transaction_sorted
register_dataframe_as_table(df_user_transaction_sorted, 'user_transaction_sorted')

True

In [72]:
# sql to find the latest transation id
sql = "select u_id, first(t_id) latest_t_id, first(created_date) created_date " \
        " from user_transaction_sorted uts " \
        " group by u_id"
sql 

'select u_id, first(t_id) latest_t_id, first(created_date) created_date  from user_transaction_sorted uts  group by u_id'

In [73]:
df_user_latest_transaction = spark_session.sql(sql)

In [74]:
# show user latest transaction
df_user_latest_transaction.show()

+----+-----------+--------------------+
|u_id|latest_t_id|        created_date|
+----+-----------+--------------------+
|  26|        626|2019-09-27T00:00:...|
|  29|        629|2019-09-30T00:00:...|
|  65|        265|2019-09-26T00:00:...|
|  19|        719|2019-09-30T00:00:...|
|  54|        654|2019-09-25T00:00:...|
|  22|        622|2019-09-23T00:00:...|
|   7|        507|2019-09-28T00:00:...|
|  77|        777|2019-09-28T00:00:...|
|  34|        534|2019-09-25T00:00:...|
|  50|        650|2019-09-21T00:00:...|
|  94|        594|2019-09-25T00:00:...|
|  57|        657|2019-09-28T00:00:...|
|  32|         32|2019-09-03T00:00:...|
|  43|        143|2019-09-24T00:00:...|
|  84|        684|2019-09-25T00:00:...|
|  31|        531|2019-09-22T00:00:...|
|  39|        539|2019-09-30T00:00:...|
|  98|        798|2019-09-19T00:00:...|
|  25|         25|2019-09-26T00:00:...|
|  95|        795|2019-09-16T00:00:...|
+----+-----------+--------------------+
only showing top 20 rows



### Use python function to calculate field for each row
**cacluate user age based on user date of birth**

In [119]:
# function to calculate the age
curr_datetime = datetime.now()
def calculate_age_from_dob(row):
    age = None
    dob_datetime = row.date_of_birth
    try:
        curr_month_datetime = datetime(2018, curr_datetime.month, curr_datetime.day)
        dob_month_datetime = datetime(2018, dob_datetime.month, dob_datetime.day)

        age = curr_datetime.year - dob_datetime.year 
        age = age if curr_month_datetime >= dob_month_datetime else age - 1
    except:
        print('out of range' + str(dob_str))
    return Row(id=row.id, age=age)

In [120]:
# only select date of birth from user table
sql = "select id, date_of_birth " \
        " from user"
sql

'select id, date_of_birth  from user'

In [121]:
df_user_dob = spark_session.sql(sql)

In [122]:
df_user_dob.show()

+---+-------------------+
| id|      date_of_birth|
+---+-------------------+
|  1|1961-02-02 00:00:00|
|  2|1962-03-03 00:00:00|
|  3|1963-04-04 00:00:00|
|  4|1964-05-05 00:00:00|
|  5|1965-06-06 00:00:00|
|  6|1966-07-07 00:00:00|
|  7|1967-08-08 00:00:00|
|  8|1968-09-09 00:00:00|
|  9|1969-10-10 00:00:00|
| 10|1970-11-11 00:00:00|
| 11|1971-12-12 00:00:00|
| 12|1972-01-13 00:00:00|
| 13|1973-02-14 00:00:00|
| 14|1974-03-15 00:00:00|
| 15|1975-04-16 00:00:00|
| 16|1976-05-17 00:00:00|
| 17|1977-06-18 00:00:00|
| 18|1978-07-19 00:00:00|
| 19|1979-08-20 00:00:00|
| 20|1960-09-21 00:00:00|
+---+-------------------+
only showing top 20 rows



In [123]:
# apply the function calculate_age_from_dob for each row of dataframe
df_user_age = df_user_dob.rdd.map(calculate_age_from_dob)

In [124]:
df_user_age = spark_session.createDataFrame(df_user_age)

In [126]:
# show calculated age dataframe
df_user_age.select('id', 'age').show()

+---+---+
| id|age|
+---+---+
|  1| 58|
|  2| 57|
|  3| 56|
|  4| 55|
|  5| 54|
|  6| 53|
|  7| 52|
|  8| 51|
|  9| 49|
| 10| 48|
| 11| 47|
| 12| 47|
| 13| 46|
| 14| 45|
| 15| 44|
| 16| 43|
| 17| 42|
| 18| 41|
| 19| 40|
| 20| 59|
+---+---+
only showing top 20 rows



### Use python function to scan the whole table for each row
**De-duplicate the users by matching the first name and the last name. For each row in user table, generate a field called superior_id by find the user with the latest modified_date among the matched users**

In [82]:
# convert pyspark dataframe to pandas dataframe, which will be scanned for each user
pdf_user = df_user.toPandas()

In [None]:
pdf_user.head()

In [103]:
# function to find the superior_id
def find_superior_user(row):
    first_name, last_name = row.first_name, row.last_name
    pdf_matched_users = pdf_user[(pdf_user['first_name'] == first_name) & (pdf_user['last_name'] == last_name)]
    pdf_matched_users = pdf_matched_users.sort_values(by=['modified_date'], ascending=False)
    return Row(id=row.id, superior_id=int(pdf_matched_users.iloc[0,:]['id']))

In [104]:
# apply the function of finding superior user to the user dataframe
df_superior_user = df_user.rdd.map(find_superior_user)
df_superior_user = spark_session.createDataFrame(df_superior_user)

In [105]:
df_superior_user.show()

+---+-----------+
| id|superior_id|
+---+-----------+
|  1|         21|
|  2|         22|
|  3|         23|
|  4|         24|
|  5|         25|
|  6|         26|
|  7|         27|
|  8|         28|
|  9|         29|
| 10|         20|
| 11|         21|
| 12|         22|
| 13|         23|
| 14|         24|
| 15|         25|
| 16|         26|
| 17|         27|
| 18|         28|
| 19|         29|
| 20|         20|
+---+-----------+
only showing top 20 rows



In [106]:
# register superior user dataframe as a table
register_dataframe_as_table(df_superior_user, 'superior_user')

True

In [107]:
# show user details and the details of the superior user
sql = "select u.id, u.first_name, u.last_name, u.modified_date, su.superior_id, " \
        " sud.first_name s_first_name, sud.last_name s_last_name, sud.modified_date s_modified_date" \
        " from superior_user su " \
        " left join user u on su.id = u.id " \
        " left join user sud on sud.id = su.superior_id "
sql

'select u.id, u.first_name, u.last_name, u.modified_date, su.superior_id,  sud.first_name s_first_name, sud.last_name s_last_name, sud.modified_date s_modified_date from superior_user su  left join user u on su.id = u.id  left join user sud on sud.id = su.superior_id '

In [108]:
df_superior_user_details = spark_session.sql(sql)

In [109]:
df_superior_user_details.show()

+---+----------+---------+-------------------+-----------+------------+-----------+-------------------+
| id|first_name|last_name|      modified_date|superior_id|s_first_name|s_last_name|    s_modified_date|
+---+----------+---------+-------------------+-----------+------------+-----------+-------------------+
|  1|   fname_2|  lname_2|2019-09-02 00:00:00|         21|     fname_2|    lname_2|2019-09-22 00:00:00|
|  2|   fname_3|  lname_3|2019-09-03 00:00:00|         22|     fname_3|    lname_3|2019-09-23 00:00:00|
|  3|   fname_4|  lname_4|2019-09-04 00:00:00|         23|     fname_4|    lname_4|2019-09-24 00:00:00|
|  4|   fname_5|  lname_5|2019-09-05 00:00:00|         24|     fname_5|    lname_5|2019-09-25 00:00:00|
|  5|   fname_6|  lname_6|2019-09-06 00:00:00|         25|     fname_6|    lname_6|2019-09-26 00:00:00|
|  6|   fname_7|  lname_7|2019-09-07 00:00:00|         26|     fname_7|    lname_7|2019-09-27 00:00:00|
|  7|   fname_8|  lname_8|2019-09-08 00:00:00|         27|     f