#Intuit Spark assignment

## Initilize Spark environment

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
file = '/content/drive/MyDrive/Colab Notebooks/Spark/payments-input.snappy.parquet'

## Initilize PySpark Session

In [7]:
spark = None
import findspark
findspark.init()
from pyspark.sql import SparkSession
if spark:
  del spark
spark = SparkSession.builder.appName("Q6").getOrCreate()
#spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Loading parquest file

In [8]:
df = spark.read.parquet(file)

## Data review - check for types and nulls

In [9]:
#schema
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- currency: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- encoded_user_id: binary (nullable = true)



In [10]:
# check if df consists null values
# create a dictional for each column and nan count
columns_nulls_dict = {column:df.filter(df[column].isNull()).count() for column in df.columns}
columns_nulls_dict

{'date': 0, 'currency': 0, 'amount': 0, 'encoded_user_id': 0}

In [11]:
df.show()

+-------------------+--------+------+--------------------+
|               date|currency|amount|     encoded_user_id|
+-------------------+--------+------+--------------------+
|2022-10-16 15:00:00|     ILS|1000.0|[67 41 41 41 41 4...|
|2022-10-16 15:00:00|     ILS|  13.5|[67 41 41 41 41 4...|
|2022-10-04 15:00:00|     EUR| 120.0|[67 41 41 41 41 4...|
|2022-10-04 15:00:00|     USD|  10.0|[67 41 41 41 41 4...|
|2022-10-22 15:00:00|     EUR|   9.0|[67 41 41 41 41 4...|
|2022-10-09 15:00:00|     USD|   1.0|[67 41 41 41 41 4...|
|2022-10-22 15:00:00|     EUR| 244.0|[67 41 41 41 41 4...|
|2022-10-09 15:00:00|     USD| 200.0|[67 41 41 41 41 4...|
|2022-10-16 15:00:00|     USD| 378.0|[67 41 41 41 41 4...|
|2022-10-04 15:00:00|     ILS|  11.0|[67 41 41 41 41 4...|
+-------------------+--------+------+--------------------+



## Answers

### 6.1 - Agg & Groupby
Show the total sum amount for each currency.

#### PySpark methods

In [12]:
# pyspark methods
from pyspark.sql.functions import sum
# Groupby currency than aggregate with sum function on amount
df.groupBy('currency').agg(sum('amount')).show()

+--------+-----------+
|currency|sum(amount)|
+--------+-----------+
|     EUR|      373.0|
|     ILS|     1024.5|
|     USD|      589.0|
+--------+-----------+



#### PySpark SQL

In [13]:
TABLE_NAME = 'payments'
# register the df as sql temp view
df.createOrReplaceTempView(TABLE_NAME)
SQL_QUERY = '''
SELECT currency , sum(amount)
FROM from payments
GROUP BY currency
'''
results = spark.sql(SQL_QUERY)
results.show()

+--------+------+
|currency|  FROM|
+--------+------+
|     EUR| 373.0|
|     ILS|1024.5|
|     USD| 589.0|
+--------+------+



### 6.2 - Join
Create a DataFrame of exchange rates for every currency to ILS, and use it to add a new column to the payments table, named amount_in_ils. 

In [14]:
# create exchange_rates data
exchange_rates = [('USD',3.6),
                  ('ILS',1.0),
                  ('EUR',4.0)]


# Create schema
from pyspark.sql.types import StructField, StructType, StringType, FloatType
schema = StructType([ 
      StructField('currency', StringType(), True),
      StructField('exchange_rate', FloatType(),True)
      ])


# create df and apply schema
exchange_rates_df = spark.createDataFrame(data=exchange_rates,schema=schema)
exchange_rates_df.printSchema()
exchange_rates_df.show()

root
 |-- currency: string (nullable = true)
 |-- exchange_rate: float (nullable = true)

+--------+-------------+
|currency|exchange_rate|
+--------+-------------+
|     USD|          3.6|
|     ILS|          1.0|
|     EUR|          4.0|
+--------+-------------+



In [16]:
# join two dataframes and create new DF
# format number with 2 decimal places
from pyspark.sql.functions import format_number, col
 
payments_df = df.join(exchange_rates_df,on='currency').withColumn(
    'amount_in_ils',format_number(col('amount')*col('exchange_rate'),2)
    )
payments_df.show()

+--------+-------------------+------+--------------------+-------------+-------------+
|currency|               date|amount|     encoded_user_id|exchange_rate|amount_in_ils|
+--------+-------------------+------+--------------------+-------------+-------------+
|     USD|2022-10-16 15:00:00| 378.0|[67 41 41 41 41 4...|          3.6|     1,360.80|
|     USD|2022-10-09 15:00:00| 200.0|[67 41 41 41 41 4...|          3.6|       720.00|
|     USD|2022-10-09 15:00:00|   1.0|[67 41 41 41 41 4...|          3.6|         3.60|
|     USD|2022-10-04 15:00:00|  10.0|[67 41 41 41 41 4...|          3.6|        36.00|
|     ILS|2022-10-04 15:00:00|  11.0|[67 41 41 41 41 4...|          1.0|        11.00|
|     ILS|2022-10-16 15:00:00|  13.5|[67 41 41 41 41 4...|          1.0|        13.50|
|     ILS|2022-10-16 15:00:00|1000.0|[67 41 41 41 41 4...|          1.0|     1,000.00|
|     EUR|2022-10-22 15:00:00| 244.0|[67 41 41 41 41 4...|          4.0|       976.00|
|     EUR|2022-10-22 15:00:00|   9.0|[67 41

### 6.3 - UDF

In [17]:
#!pip install cryptography==38.0.3

In [18]:
# import given function
from cryptography.fernet import Fernet

def decode(encoded_used_id: bytes) -> str:
   key = b'CuTaE-KQM5MOZkExifXvfssUzXxU4TtNQyiggxCh8G8='
   used_id: str = Fernet(key).decrypt(encoded_used_id.decode()).decode()
   return used_id


In [39]:
from pyspark.sql.functions import udf
decode_udf = udf(lambda x: decode(x), StringType())

# decode using UDF and given function
payments_df.select(decode_udf(col('encoded_user_id'))).show()

# add new column to payments_df using withColumn
payments_df = payments_df.withColumn('decoded_user_id',
                       decode_udf(col('encoded_user_id'))
                    )
payments_df.show()

+-------------------------+
|<lambda>(encoded_user_id)|
+-------------------------+
|         Dmitry Burshtein|
|                Sagi Vegh|
|                  Tom Mor|
|                Dana Assa|
|              Itay Granik|
|         Michael Livshits|
|          Osnat Haj Yahia|
|             Hersh Shefer|
|                Dana Assa|
|               Uri Shohet|
+-------------------------+

+--------+-------------------+------+--------------------+-------------+-------------+----------------+
|currency|               date|amount|     encoded_user_id|exchange_rate|amount_in_ils| decoded_user_id|
+--------+-------------------+------+--------------------+-------------+-------------+----------------+
|     USD|2022-10-16 15:00:00| 378.0|[67 41 41 41 41 4...|          3.6|     1,360.80|Dmitry Burshtein|
|     USD|2022-10-09 15:00:00| 200.0|[67 41 41 41 41 4...|          3.6|       720.00|       Sagi Vegh|
|     USD|2022-10-09 15:00:00|   1.0|[67 41 41 41 41 4...|          3.6|         3.60| 