In [None]:
import time
start_time = time.time()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql.functions import * #for SQL functions
from pyspark.sql.window import Window #for window function
from datetime import datetime

In [None]:
"""Cluster Specifications
5 Nodes
Driver: I3.xlarge
runtime version: 10.3 (includes Apache Spark 3.2.1, Scala 2.12)
30.5 GB Memory, 4 cores
Min workers-2
Max workers-5
"""

In [None]:
spark = SparkSession.builder \
            .appName("dds") \
            .getOrCreate()

# Add configuration for accessing S3

In [None]:
aws_access_key = '####'
aws_secret_key = '#####'
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') 
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')
spark._jsc.hadoopConfiguration().set('spark.network.timeout','7200s')
spark._jsc.hadoopConfiguration().set('spark.executor.heartbeatInterval','1200s')

# Data Pre-processing:

In [None]:
def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except:
        return None

def FloatSafe(value): # In case there are non-integer type to be converted.
    try:
        return float(value)
    except:
        return None
    
def toTimeSafe(inval):
    inval = inval.strip("\"") # Timestamp starting and ending with a double quotation mark.
    try:
        return datetime.strptime(inval, "%Y/%m/%d")
    except ValueError:
        return None
    
def BoolSafe(value): # In case there are non-integer type to be converted.
    try:
        if value =="True": return True
        else: return False
    except:
        return None

## 1. Covid data

In [None]:
# read covid file using pyspark context
header = sc.textFile("s3://msds697distdata/COVID-19_Cases_by_Geography_and_Date.csv").first()

covid_file = sc.textFile("s3://msds697distdata/COVID-19_Cases_by_Geography_and_Date.csv")\
             .filter(lambda x: x!= header)\
             .map(lambda x : x.split(','))\
             .map(lambda x : (toTimeSafe(x[0])
                              , x[1]
                              , x[2]
                              , FloatSafe(x[3])
                              , FloatSafe(x[4])
                              , FloatSafe(x[5])
                              , FloatSafe(x[6])                                            
                            ))

schema = StructType([ StructField("Specimen_Collection_Date", DateType(), True),
                      StructField("area_type", StringType(), True),
                      StructField("id", StringType(), True),
                      StructField("acs_population", FloatType(), True),
                      StructField("New_Confirmed_Cases", FloatType(), True),
                      StructField("Cumulative_Confirmed_Cases", FloatType(), True),
                      StructField("Rate_of_Cumulative_Confirmed_Case", FloatType(), True)    
                    ])

covid_df = spark.createDataFrame(covid_file, schema)

In [None]:
covid_df.show(10)

In [None]:
# Filtering to only required columns - neighbourhood, covid counts and population
population = covid_df.filter(covid_df['area_type'] == 'Analysis Neighborhood')\
            .select('id', 'acs_population')


In [None]:
# aggregating to find covid counts in each neighborhood for each month from March 2020
covid_df = covid_df.filter(covid_df['area_type'] == 'Analysis Neighborhood')\
    .groupBy('Specimen_Collection_Date','id')\
    .agg(sum('New_Confirmed_Cases').alias('daily_new_cases'))\
    .select('Specimen_Collection_Date','id',year("Specimen_Collection_Date").alias('year'), month("Specimen_Collection_Date").alias('month'), 'daily_new_cases')\
    .sort('Specimen_Collection_Date')\
    .groupBy('year','month','id')\
    .agg(sum('daily_new_cases').alias('monthly_cases'))\
    .sort(['id', 'year', 'month'])\
    .join(population, 'id', 'left_outer')\
    .select('id', 'year', 'month', 'monthly_cases', 'acs_population')\
    .distinct()

In [None]:
# calculating covid case %age in each neighborhood in each month
covid_df = covid_df.select('id', 'year', 'month','monthly_cases', 'acs_population', round((covid_df['monthly_cases']/covid_df['acs_population']) * 100, 2).alias(
'percent_population_with_covid')).sort(['year', 'month','id'])

In [None]:
#calculating total number of monthly covid cases across SF
total_covid_cases_per_month = covid_df.groupBy('year', 'month').agg(sum('monthly_cases').alias('total_cases'))

## 2. Business data

In [None]:
schema = StructType([ StructField("location_id", StringType(), True),
                      StructField("bus_accnt_num", StringType(), False),
                      StructField("Owner_name", StringType(), True),
                      StructField("DBA_name", StringType(), True),
                      StructField("address", StringType(), True),
                      StructField("city", StringType(), True),
                      StructField("state", StringType(), True),
                     StructField("zipcode", StringType(), True),
                     StructField("bus_start_date", DateType(), True),
                     StructField("bus_end_date", DateType(), True),
                     StructField("loc_start_date", DateType(), True),
                     StructField("loc_end_date", DateType(), True),
#                      StructField("mail_address", StringType(), True),
#                      StructField("mail_city", StringType(), True),
#                      StructField("mail_zipcode", StringType(), True),
#                      StructField("mail_state", StringType(), True),
                     StructField("NAICS_code", StringType(), True),
                     StructField("NAICS_code_desc", StringType(), True),
                     StructField("parking_tax", StringType(), True),
                     StructField("transient_occ_tax", StringType(), True),
#                      StructField("LIC_code", StringType(), True),
#                      StructField("LIC_code_desc", StringType(), True),
                     StructField("supervisor_dist", StringType(), True),
                     StructField("neigh_analysis", StringType(), True),
                     StructField("bus_corr", StringType(), True),
                     StructField("bus_loc", StringType(), True),
                     StructField("uniqueID", StringType(), True),
                     StructField("SF_neighbourhoods", StringType(), True),
                     StructField("Police_district", StringType(), True),
                     StructField("current_super_dist", StringType(), True),
                     StructField("analysis_neighbor", StringType(), True),
                     StructField("neighborhoods", StringType(), True)
                    ])

In [None]:
#Reading the SF business data
business = sc.textFile("s3://msds697distdata/Registered_Business_Locations_-_San_Francisco.tsv").map(lambda x : x.split('\t'))
def DateSafe(value):
    try:
        return datetime.strptime(value, '%m/%d/%Y')
    except:
        return None
    
    
head = business.first()
print(business.count())
business = business.filter(lambda x: x != head)\
                   .map(lambda x:(x[0], x[1], x[2], x[3], x[4], x[5], x[6], IntegerSafe(x[7]),\
                          DateSafe(x[8]),DateSafe(x[9]), DateSafe(x[10]), DateSafe(x[11]),\
#                         x[12], x[13],IntSafe(x[14]), x[15], 
                          x[16], x[17], BoolSafe(x[18]), BoolSafe(x[19]), x[22], x[23], x[24], x[25], x[26],\
                          IntegerSafe(x[27]),IntegerSafe(x[28]),IntegerSafe(x[29]), IntegerSafe(x[30]),IntegerSafe(x[31]))) 

business.count()
business_df = spark.createDataFrame(business, schema)

In [None]:
# Removing missing neighborhoods as they are outside San Francisco
business_df = business_df.filter("neigh_analysis != '' and state == 'CA'").drop('state', 'city')

In [None]:
# Removing duplicates on the basis of owner name, DBA name, address and business start and end dates
dropbusDF = business_df.dropDuplicates(["Owner_name","DBA_name","bus_start_date","address","bus_end_date"])
print("Distinct count of department & salary : "+str(dropbusDF.count()))

In [None]:
dropbusDF.show(10)

##  3.Crime data

In [None]:
#Reading SF crime data
crime = sc.textFile("s3://msds697distdata/Police_Department_Incident_Reports__2018_to_Present.csv").map(lambda x : x.split(','))

columns = crime.first()
crime = crime.filter(lambda x: x!=columns)

In [None]:

def TimeSafe(inval):
    inval = inval.strip("\'") # Timestamp starting and ending with a double quotation mark.
    try:
        return datetime.strptime(inval, "%Y/%m/%d")
    except ValueError:
        return None
    
    
crime = crime.map(lambda x:[TimeSafe(x[1]),
                            x[14],
                            x[21],
                            FloatSafe(x[23]),
                            FloatSafe(x[24]),
                            x[25]
                            ])


schema_crime = StructType([
                          StructField('Incident Date',DateType(),True),
                          StructField('Incident Category',StringType(),True),
                          StructField('Analysis Neighborhood',StringType(),True),
                          StructField('Latitude',FloatType(),True),
                          StructField('Longitude',FloatType(),True),
                         StructField('Point',StringType(),True)])
                          

df_crime = spark.createDataFrame(crime, schema_crime)

In [None]:
df_crime.show(10)

In [None]:
# convert date into year and Month
df_crime = df_crime.withColumn('year',year(df_crime['Incident Date']))
df_crime = df_crime.withColumn('month',month(df_crime['Incident Date']))
df_crime = df_crime.withColumnRenamed('Incident Category','IncidentCategory')

df_crime = df_crime.withColumnRenamed('Incident Category','IncidentCategory')
df_crime = df_crime.withColumnRenamed('Analysis Neighborhood','AnalysisNeighborhood')

# Filterout the vehicle recovery from the date

df_crime = df_crime.filter("IncidentCategory !='Recovered Vehicle'")

# Split crime incident into larceny theft and others Larceny Theft 
df_crime = df_crime.withColumn('Larceny_theft',df_crime['IncidentCategory']=='Larceny Theft')
df_crime = df_crime.withColumn('other_incident',df_crime['IncidentCategory']!='Larceny Theft')

# Filter out incidents without addressinfo/latitude neighborhood
df_crime = df_crime.filter("Latitude IS NOT NULL")

In [None]:
df_crime.orderBy('year',ascending=0).show(3)

In [None]:
# Groupby Neighbothood month and year
df_crime_grouped = df_crime.groupBy([df_crime['AnalysisNeighborhood'],df_crime.year,df_crime.month]).count().orderBy('year',ascending=0)

In [None]:
df_crime_grouped.show(10)

# Joining datasets

In [None]:
#Rows in each dataset
print(dropbusDF.distinct().count(),covid_df.distinct().count(), df_crime_grouped.distinct().count())

In [None]:
#joining crime data to business data
joined = dropbusDF.join(df_crime_grouped, df_crime_grouped.AnalysisNeighborhood== dropbusDF.neigh_analysis, 'left_outer')

In [None]:
#joining the covid data to the above joined data
joined_covid = joined.join(covid_df,(covid_df.id== joined.neigh_analysis) & (covid_df.year== joined.year) & (covid_df.month== joined.month), 'left_outer').cache()

In [None]:
joined_covid.count()

In [None]:
#dropping double variables
joined_df = joined_covid.drop(covid_df.year).drop(covid_df.month).drop('id')

In [None]:
#filtering the start and end dates of the business as per our requirement
#The data has been made monthly time series for each business as per the neighborhood
def year_month(value):
    try:
        if value < 10:
            return "0"+ str(value)
        return value
    except:
        return None
month_udf = udf(year_month)
# Keeping records with start date before the monthly counter date 
# Keeping accounts with end date either null or end date after 2018 as our data is from 2018
# Keeping records with business close date either null or before the monthly counter date

final_df = joined_df.withColumn('year_month', concat(col('year'),month_udf(col('month'))))\
            .withColumn('year_month_strt', concat(year('loc_start_date'),month_udf(month('loc_start_date'))))\
            .withColumn('year_month_end', concat(year('loc_end_date'),month_udf(month('loc_end_date'))))\
            .filter("year_month_strt <= year_month")\
            .filter("year_month_end IS NULL or year_month_end >= 2018")\
            .filter("year_month_end IS NULL or year_month_end <= year_month").drop("year").drop("month").cache()

In [None]:
final_df.show(2)

In [None]:
final_df.count()

In [None]:
# Creating a business closure flag during covid

df_closed_flag = final_df.withColumn( 'covid_close_flag_new', when((final_df.year_month_end == final_df.year_month), 1).otherwise(0))


In [None]:
df_closed_flag.groupBy('covid_close_flag_new').count().show()

In [None]:
#Missing Value imputation for covid data

df_miss_val = df_closed_flag.withColumn("monthly_cases", when((df_closed_flag.monthly_cases.isNull()), 0).otherwise(df_closed_flag.monthly_cases))\
                            .withColumn("acs_population", when((df_closed_flag.acs_population.isNull()), 0).otherwise(df_closed_flag.acs_population))\
                            .withColumn("percent_population_with_covid", when((df_closed_flag.percent_population_with_covid.isNull()), 0).otherwise(df_closed_flag.percent_population_with_covid))

In [None]:
df_miss_val.show(3)

In [None]:
print("--- %s seconds ---" % (time.time() - start_time))

# Connect to MongoDB
## Store aggregates in the database and re-read for machine learning later

In [None]:
database = '###'
collection = '###'
user_name = '###'
password = '###'
address = 'group-11.akldt.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
# connection_string = f"mongodb+srv://msds_9:<datasystems>@group-11.akldt.mongodb.net/Group-11"

In [None]:
df_miss_val.write.format("mongo").option("uri",connection_string).mode("append").save()

In [None]:
df = spark.read.format("mongo").option("uri",connection_string).load()

In [None]:
df.show()