In [0]:
# import necessary libraries
from pyspark.sql.types import *
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

##########################################################################################################################################
##################################################### Data Reading & Combining Section ###################################################
##########################################################################################################################################


# Define file type
file_type = "csv"
# Whether the file has a header
first_row_is_header = "true"
# Delimiter used in the file
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/new_user_credentials.csv")

#test_user
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='test_user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='test_user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# AWS S3 bucket name
AWS_S3_BUCKET = "marketbasketbucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/mba/chunks"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)


# Reading and Combining the csv files
from pyspark.sql.types import *
# File Schema
file_schema = StructType([
    StructField('event_time', StringType(), False),
    StructField('event_type', StringType(), True),
    StructField('product_id', IntegerType(), True),
    StructField('category_id', LongType(), True),
    StructField('category_code', StringType(), True),
    StructField('brand', StringType(), True),
    StructField('price', FloatType(), True),
    StructField('user_id', IntegerType(), True),
    StructField('user_session', StringType(), True)
])

#Creating an empty dataframe with the same schema
maindf = spark.createDataFrame([], file_schema)

# The applied options are for CSV files. For other file types, these will be ignored.
for i in dbutils.fs.ls("dbfs:/mnt/mba/chunks/"):
    df = spark.read.csv(path= i.path, header=True, schema=file_schema)
    maindf = maindf.union(df)
    
    
#Show the Dataframe if required
#maindf.show(10, truncate=False)




##########################################################################################################################################
##################################################### Data Transformation Section ########################################################
##########################################################################################################################################
#DataFlow

# Compress the data and store in an optimized format.


#Missing Values
nonNAdf = maindf.where(maindf.category_code.isNotNull())


#Separate_productTypes
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import isnan, when, count, col

def string_split(string1):
    main_string = string1.split('.')[-1]
    main_string = main_string.strip()
    return main_string

udfstringfunc = F.udf(string_split, StringType())
nonNAdf = nonNAdf.withColumn('productType', udfstringfunc('category_code'))




# Events Split
nonNAdf_purchase = nonNAdf.select('*').where(col('event_type')=='purchase')
nonNAdf_view = nonNAdf.select('*').where(col('event_type')=='view')
nonNAdf_cart = nonNAdf.select('*').where(col('event_type')=='cart')


#groupby users Purchase
grouped_transactions = nonNAdf_purchase.select('*').groupBy('user_session').agg(F.concat_ws(",",F.collect_list(nonNAdf_purchase.productType))).select(col('user_session').alias('sid'), col("concat_ws(,, collect_list(productType))").alias("transactionslist"))


#join the table
grouped_transactions = grouped_transactions.join(nonNAdf_purchase, grouped_transactions.sid==nonNAdf_purchase.user_session, 'left')
grouped_transactions= grouped_transactions.drop(*['sid', 'category_id'])
grouped_transactions.cache()

#create View for SQL queries
grouped_transactions.createOrReplaceTempView('viewGrouped')

# As the spark is lazy loader, a view command is necessary to write before unmounting so that the caching of the table "grouped_transactions" is done well.
grouped_transactions.show(10)

#Un Mount the mount as we have final data in maindf DataFrame
dbutils.fs.unmount(MOUNT_NAME)

+----------------+--------------------+----------+----------+--------------------+-------+-------+---------+--------------------+-------------+
|transactionslist|          event_time|event_type|product_id|       category_code|  brand|  price|  user_id|        user_session|  productType|
+----------------+--------------------+----------+----------+--------------------+-------+-------+---------+--------------------+-------------+
|              tv|2019-10-31 17:15:...|  purchase|   1801938|electronics.video.tv|   sony|1543.39|514674061|0026da65-d60b-4d7...|           tv|
|        notebook|2019-10-11 02:34:...|  purchase|   1307377|  computers.notebook| lenovo| 434.99|538672167|00326296-8877-47f...|     notebook|
|       subwoofer|2019-10-16 07:43:...|  purchase|   5801483|electronics.audio...|pioneer|  58.95|513593613|0032e941-8533-46a...|    subwoofer|
|      smartphone|2019-10-01 13:19:...|  purchase|   1004173|electronics.smart...| xiaomi| 146.46|551801399|0056e55b-b2fe-4b9...|   smar

## Types of DataFrames:
__maindf:__ Dataframe created by combining chunks

__nonNADf:__ maindf with no category missing values

__1.) nonNAdf_purchase:__ nonNAdf of purchase events

__2.) nonNAdf_view:__ nonNAdf of view events

__3.) nonNAdf_cart:__ nonNAdf of cart events

### Reading from a Parquet Intermediate File

In [0]:
file_parquet_df = spark.read.format("parquet").load("/tmp/MBA/transactionList.parquet")
file_parquet_df.show(10, truncate=False)

file_parquet_df.createOrReplaceTempView('transList')

+--------------------------------+------+
|transactions                    |count |
+--------------------------------+------+
|smartphone                      |162696|
|smartphone,smartphone           |17893 |
|headphone                       |15662 |
|tv                              |10676 |
|clocks                          |9272  |
|washer                          |8052  |
|notebook                        |7745  |
|vacuum                          |6759  |
|refrigerators                   |5928  |
|smartphone,smartphone,smartphone|3844  |
+--------------------------------+------+
only showing top 10 rows



## Analytical Work, SQL queries

In [0]:
%sql
select DISTINCT user_session, * from viewGrouped WHERE brand = 'samsung' ORDER BY user_id LIMIT 10

user_session,transactionslist,event_time,event_type,product_id,category_code,brand,price,user_id,user_session.1,productType
10456526-1e4c-487b-824b-04dd0d1b73d1,"smartphone,smartphone",2019-10-18 19:07:54 UTC,purchase,1004836,electronics.smartphone,samsung,229.22,403013066,10456526-1e4c-487b-824b-04dd0d1b73d1,smartphone
c28d8661-8cb9-4c27-9f18-cf41d82324bf,smartphone,2019-10-15 07:09:17 UTC,purchase,1004767,electronics.smartphone,samsung,250.82,427391662,c28d8661-8cb9-4c27-9f18-cf41d82324bf,smartphone
69298660-1ae1-4e5f-b7e2-3a5e4346cc03,washer,2019-10-13 10:20:29 UTC,purchase,3600661,appliances.kitchen.washer,samsung,296.02,429474454,69298660-1ae1-4e5f-b7e2-3a5e4346cc03,washer
f320ab60-ce00-4814-8d12-f7710162b45c,smartphone,2019-10-06 08:47:28 UTC,purchase,1004875,electronics.smartphone,samsung,388.13,435648894,f320ab60-ce00-4814-8d12-f7710162b45c,smartphone
71479926-0bfb-48e3-9c03-1c47a47b5554,smartphone,2019-10-07 10:03:29 UTC,purchase,1004857,electronics.smartphone,samsung,130.25,440471930,71479926-0bfb-48e3-9c03-1c47a47b5554,smartphone
b6cbf774-e800-4645-95d0-7cb337e726f3,smartphone,2019-10-18 06:34:47 UTC,purchase,1004870,electronics.smartphone,samsung,286.4,446080337,b6cbf774-e800-4645-95d0-7cb337e726f3,smartphone
44aed8ab-e54d-4a5c-8012-0b4114491bfd,"tv,smartphone",2019-10-03 09:43:25 UTC,purchase,1004858,electronics.smartphone,samsung,132.02,450275087,44aed8ab-e54d-4a5c-8012-0b4114491bfd,smartphone
92d545c0-1956-4aaf-90b8-8bbe7d2b84a4,smartphone,2019-10-04 10:16:00 UTC,purchase,1004858,electronics.smartphone,samsung,132.56,450275087,92d545c0-1956-4aaf-90b8-8bbe7d2b84a4,smartphone
44aed8ab-e54d-4a5c-8012-0b4114491bfd,"tv,smartphone",2019-10-03 09:30:57 UTC,purchase,1801739,electronics.video.tv,samsung,289.58,450275087,44aed8ab-e54d-4a5c-8012-0b4114491bfd,tv
05a03065-ab85-4550-806c-5f2adfae8371,smartphone,2019-10-17 03:45:17 UTC,purchase,1004767,electronics.smartphone,samsung,250.66,462288298,05a03065-ab85-4550-806c-5f2adfae8371,smartphone


In [0]:
import pyspark.sql.functions as F

maindf.groupby('event_type').agg(
    (F.count('event_type')).alias('count'),
    (F.count('event_type')*100 / maindf.count()).alias('percentage')
).show()

+----------+--------+------------------+
|event_type|   count|        percentage|
+----------+--------+------------------+
|  purchase|  499427|1.7555313912572303|
|      view|27334774| 96.08422017616581|
|      cart|  614564|  2.16024843257695|
+----------+--------+------------------+



In [0]:
%sql
SELECT * FROM transList LIMIT 10

transactions,count
smartphone,162696
"smartphone,smartphone",17893
headphone,15662
tv,10676
clocks,9272
washer,8052
notebook,7745
vacuum,6759
refrigerators,5928
"smartphone,smartphone,smartphone",3844


### Unpersist Code

In [0]:
## Remove the data stored in the memory
grouped_transactions.unpersist()

Out[9]: DataFrame[transactions: string, event_time: string, event_type: string, product_id: int, category_code: string, brand: string, price: float, user_id: int, user_session: string, productType: string]

### Unmounting Code

In [0]:
#Un Mount the mount as we have final data in maindf DataFrame
dbutils.fs.unmount(MOUNT_NAME)

/mnt/mba/chunks has been unmounted.
Out[10]: True