# Prerrequisites

Installing Spark and Apache Kafka Library in VM


---



In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip -q install findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.8.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'

In [None]:
import findspark
findspark.init()

Starting Spark Session and print the version


---


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("spark.ui.port", "4050") \
        .getOrCreate()

spark.version

Creating ngrok tunnel to allow Spark UI (Optional)


In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

# Download Datasets

In [None]:
!mkdir -p /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/trades.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/trades.json -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/offshore_leaks.edges.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/offshore_leaks.nodes.address.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/offshore_leaks.nodes.intermediary.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/offshore_leaks.nodes.officer.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/offshore_leaks.nodes.entity.csv -P /dataset
!ls /dataset

# Project 1 - Regulatory Banking Project
---

Input files: /dataset/trades.csv & /dataset/trades.json


# Project 2 - Transactions Notifications

*Hint: https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html*

In [None]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ec2-3-231-22-58.compute-1.amazonaws.com:9092") \
  .option("subscribe", "transactions") \
  .load()

In [None]:
schema = StructType(
    [
     StructField('Account No', StringType(), True),
     StructField('DATE', StringType(), True),
     StructField('TRANSACTION DETAILS', StringType(), True),
     StructField('CHQ.NO.', StringType(), True),
     StructField('VALUE DATE', StringType(), True),
     StructField(' WITHDRAWAL AMT ', StringType(), True),
     StructField(' DEPOSIT AMT ', StringType(), True),
     StructField('BALANCE AMT', StringType(), True)
    ]
)
df.printSchema()

dataset = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema)) \
    .select(col('key'), col("timestamp"), col('value.*'))

In [None]:
dataset_count.writeStream \
 .outputMode("update") \
 .format("memory") \
 .option("truncate", "false") \
 .queryName("transactions") \
 .start()

In [None]:
spark.sql("select * from transactions").show(truncate = False)

# Project 3 - Panama Papers

Trace "Spring Song International Co., Ltd." entity with Spark SQL using the following dataset</br>
/dataset/offshore_leaks.nodes.entity.csv </br>
/dataset/offshore_leaks.nodes.intermediary.csv </br>
/dataset/offshore_leaks.edges.csv </br>
/dataset/offshore_leaks.nodes.officer.csv