## Spark Optimization Analysis
### data source: In conjunction with a volunteer [HackforLA](https://www.hackforla.org/) civic project this public Los Angeles parking [ticket data](https://data.lacity.org/Transportation/Parking-Citations/4f5p-udkv/about_data) was sourced from an API. 

In [None]:
import json
import http.client
import requests
import sys
import pandas as pd
import os
import shutil
from pathlib import Path
from datetime import datetime
from pyspark.sql import SparkSession,types
from pyspark import SQLContext,SparkContext,SparkConf
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

In [None]:
# Setting Configurations 
conf=SparkConf()
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "4g")
conf.set("spark.cores.max", "2")

In [None]:
# Staring Spark Session
# sc = SparkContext.getOrCreate(conf)
# spark = SQLContext(sc)
spark = SparkSession.builder\
    .appName("ReadCSVFile") \
    .config("spark.jars.packages","org.postgresql:postgresql:42.6.0")\
    .getOrCreate()
sqlContext= SparkSession(spark)
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.getConf().getAll()


In [None]:
# Read in file
file_path = 'Parking_Citations.csv'
df_spark = spark.read.csv(file_path, header=True)

# Write raw table to Postgres
df_spark.write.format("jdbc").option("url", "jdbc:postgresql://spark-postgres-1:5432/sparkdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'public."luck-parking-raw"') \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()

In [None]:
# Basic Validation and Exploration
df_spark.createOrReplaceTempView("view")

# Row count
count = spark.sql(
    "SELECT COUNT(*) from view"
)
count.show()

# Fields
column_names = spark.sql(
    "SELECT * FROM view"
).schema.names

column_names.show()

# Distinct ID count
distinct_id_count = spark.sql("""
    SELECT COUNT(DISTINCT ticket_number)
    FROM view 
""")
distinct_id_count.show()

# Checking for ID dups
finding_dubs = spark.sql("""
    SELECT count(*) FROM
    (SELECT ticket_number, COUNT(*) as dups
    FROM view
    GROUP BY ticket_number)
    WHERE dups = 1
""")
finding_dubs.show()

# High level anomolies
res = spark.sql("""
    SELECT * 
    FROM view
    WHERE substring(issue_date,7,5) >= '2016' 
    AND meter_id=37
    ORDER BY fine_amount DESC    
""").toPandas()
res
