# Channel VAS Data assignment 

Author: Konstantinos Razgkelis 

Date:   Friday, October 22, 2021
 

Step 1: We need to import the PySpark SQL modules that will be used for our project.

In [52]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd

Step 2: We need to start a Spark session.

In [51]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

Step 3: We are assigning the schema of the file that will be brought from HDFS. We also clean the unnecessary data of each row by assigning the data type of each column and filtering out the null data. 

In [133]:
csv_schema = StructType([
    StructField('timestamp', TimestampType(), True),
    StructField('amount', DecimalType(scale=4), True),
    StructField('channel', StringType(), True)
])

initial_data = spark.read.csv(path="/opt/workspace/cvas_data.csv", sep=",", header=False, schema=csv_schema)

clean_data = initial_data.filter(     initial_data["timestamp"].isNotNull()\
                                   &    initial_data["amount"].isNotNull()\
                                   &    initial_data["channel"].isNotNull()\
                                   &    initial_data["channel"].rlike("(\w+)")
                                  )



Bonus step #1: Check the size of the initial and cleaned data.

In [127]:
initial_data.count()

104

In [142]:
clean_data.count()

99

Step 4: We will be saving the new spark DataFrame to HDFS. We will be also using pandas to save the parquet file. 

In [134]:
pd_df = clean_data.toPandas()

In [135]:
pd_df.to_parquet("/opt/workspace/output.parquet", index=False)

Step 5: We have our output file in the HDFS.


In [136]:
parquet = spark.read.parquet("file:///opt/workspace/output.parquet")
parquet.createOrReplaceTempView("parquet")
parquet.printSchema()

Step 6: We can start making SQL queries in order to take results from our data.

In [153]:
spark.sql("SELECT * FROM parquet WHERE parquet.amount > '0.3' and parquet.channel = 'SMS' ").show()

+-------------------+------+-------+
|          timestamp|amount|channel|
+-------------------+------+-------+
|2021-08-15 23:09:14|0.7500|    SMS|
|2021-08-15 23:06:27|1.5000|    SMS|
|2021-08-16 08:46:51|0.7500|    SMS|
|2021-08-16 02:59:14|0.7500|    SMS|
|2021-08-16 07:16:02|0.7500|    SMS|
|2021-08-16 08:10:23|0.7500|    SMS|
|2021-08-16 02:36:15|0.7500|    SMS|
|2021-08-16 01:45:30|0.7500|    SMS|
|2021-08-16 05:03:57|0.7500|    SMS|
|2021-08-16 06:37:47|0.7500|    SMS|
|2021-08-16 00:42:31|0.7500|    SMS|
|2021-08-16 02:32:25|1.5000|    SMS|
|2021-08-16 06:47:11|0.7500|    SMS|
|2021-08-16 05:08:18|0.7500|    SMS|
|2021-08-16 06:19:31|0.7500|    SMS|
|2021-08-15 23:43:09|0.7500|    SMS|
|2021-08-16 06:20:50|1.5000|    SMS|
|2021-08-16 08:00:19|1.5000|    SMS|
|2021-08-16 03:04:01|1.5000|    SMS|
|2021-08-16 06:27:55|0.7500|    SMS|
+-------------------+------+-------+
only showing top 20 rows



Step 7: We have finished our job and we just need to close the spark.session so that it can clean the unused files.

In [None]:
spark.stop()