# Configurations

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -O ./spark-3.3.1-bin-hadoop3.tgz  https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar zxvf ./spark-3.3.1-bin-hadoop3.tgz 
!pip install findspark 

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
import random
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lr_example').getOrCreate()

In [None]:
# Other imports
import numpy as np
import pandas as pd

import pyspark
from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import substring, when, concat, lit, col, to_date, regexp_replace, udf

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

import warnings
warnings.filterwarnings("ignore") #Ignores all unfamiliar fonts

import seaborn as sns
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (15,7)

import datetime

# Import Data and Translate the data

In [None]:
!pip install -U -q PyDrive
 
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
 
# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
# link = 'https://drive.google.com/file/d/1d8b_n-6fo5HbxyW-kwY3c8fWlvLOcFYy/view?usp=share_link'
 
# # to get the id part of the file
# id = link.split("/")[-2]
 
# downloaded = drive.CreateFile({'id':id})
# downloaded.GetContentFile('-3-2022-.csv') 
 
# df_pd = pd.read_csv('-3-2022-.csv')
# df_pd.info()

In [None]:
#In this section we translated the data from Hebrew to English. Then we downloaded the translated file and opened it with pyspark.

In [None]:
# !pip install googletrans==4.0.0rc1 gwpy &> /dev/null

In [None]:
# from googletrans import Translator
# translator = Translator()

In [None]:
# def translate_column (df:pd.DataFrame, name:str):
#   y = 'he_' + name
#   column_to_translate = pd.DataFrame(list(df[name].unique())).rename({0:y},axis=1)
#   x = 'en_' + name
#   column_to_translate[x] = column_to_translate[y].apply(lambda x: translator.translate(x, src='he', dest='en').text)
#   df = df.merge(column_to_translate, left_on=name, right_on=y,how ='inner')
#   df[name]=df[x]
#   df=df.drop(columns=[x,y])
#   return df

In [None]:
# en_df_pd = df_pd
# column_names = en_df_pd.columns
# k = column_names.delete([4,5,6,9])
# for i in k:
#   en_df_pd = translate_column(en_df_pd,i)
# en_df_pd

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

# en_df_pd.to_csv('/content/drive/MyDrive/BDP_Project/en_df.csv', index=False)

#Create Spark-DF

In [None]:
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

In [None]:
link = 'https://drive.google.com/file/d/1lUTnYWvVjtoHVkiH3whj-dREJDdPoez2/view?usp=share_link'
 
# to get the id part of the file
id = link.split("/")[-2]
 
downloaded = drive.CreateFile({'id':id})
downloaded.GetContentFile('en_df.csv') 
 
df = spark.read.csv("en_df.csv",inferSchema=True, header=True)
df.printSchema()
df.show(5)
#'df' show the translate data

In [None]:
#Fixed a wrong translation
df = df.withColumn('PoliceDistrict', regexp_replace('PoliceDistrict', 'Cell County', 'Tel Aviv District'))
df = df.withColumn('PoliceDistrict', regexp_replace('PoliceDistrict', 'Shay County', 'Judea and Samaria District'))

In [None]:
#Change the attribute from 'double' to 'int'
df1 = df.withColumn("TikimSum",df.TikimSum.cast('int'))

In [None]:
# In the next two cells we merged the date columns(quarter and year) to one column and Change the new attribute to 'date'
df2 = df1.withColumn('year', substring('Quarter', 1,4))\
    .withColumn('Q', substring('Quarter', 6,2))
df2.printSchema()
df2.show(truncate=False)

In [None]:
df2 = df2.withColumn("year",df2.year.cast(StringType()))
df3 = df2.withColumn("date", \
   when((df2.Q == 'Q2'), lit("-30-06"))\
   .when((df2.Q == 'Q3'), lit("-30-09"))\
   .when((df2.Q == 'Q4'), lit("-31-12"))\
   .otherwise(lit("-31-03")) \
  )
df3 = df3.withColumn("Exact Date", concat('year','date'))
df3 = df3.withColumn("Date Value", to_date(df3['Exact Date'], "yyyy-dd-MM"))
df3.show(5)
df3.printSchema()

In [None]:
link = "https://drive.google.com/file/d/1RBkuTYpTQHl0EjhiteL3DjSLd5xUxvYa/view?usp=share_link"
 
# to get the id part of the file
id = link.split("/")[-2]
 
downloaded = drive.CreateFile({'id':id})
downloaded.GetContentFile('city_data_.csv') 
 
citydata = spark.read.csv("city_data_.csv",inferSchema=True, header=True)

In [None]:
# 'citydata' used for the population and for the names(in english) of the cities
citydata = citydata.withColumnRenamed("שם יישוב","Settlement_Council")
citydata = citydata.withColumnRenamed("סך הכל אוכלוסייה 2021","Population")
citydata = citydata.withColumnRenamed("תעתיק" , "City Name")

citydata.show(5)

In [None]:
city_df = citydata.select("Settlement_Council", "Population","City Name" )

# Join the selected columns from city_df with df3(main data) on the 'Settlement_Council' column
df4 = df3.join(city_df, on='Settlement_Council', how='left')

In [None]:
link = "https://drive.google.com/file/d/1FRVWLJLf_h-vWivkEzvC5RJBiI5kRnPW/view?usp=share_link"
 
# to get the id part of the file
id = link.split("/")[-2]
 
downloaded = drive.CreateFile({'id':id})
downloaded.GetContentFile('s_e_2019.csv') 
 
socio_economic_df = spark.read.csv("s_e_2019.csv",inferSchema=True, header=True)

In [None]:
#'socio_economic_df' used for the Socio-economic of the cities
#Socio-economic '1' is the lowest
#Socio-economic '10' is the highest
socio_economic_df.show(5)

In [None]:
socio_economic_df = socio_economic_df.select(col("Settlement_Council"), col("City_Cluster_2019"))

# Join the selected columns from socio_economic_df with df3(main data) on the 'Settlement_Council' column
df5 = df4.join(socio_economic_df, on='Settlement_Council', how='left')

In [None]:
#After merged the two data to the main data we define the attribute 'City Name' to be 'Settlement_Council'(=the name of the cities in english)
df5 = df5.withColumnRenamed("Settlement_Council", "Settlement_Council_Hebrew")
df5 = df5.withColumnRenamed("City Name", "Settlement_Council")


In [None]:
city_df = city_df.withColumnRenamed("Settlement_Council", "Settlement_Council_Hebrew")
city_df = city_df.withColumnRenamed("City Name", "Settlement_Council")

In [None]:
df5.printSchema()
df5.show(truncate=False)

In [None]:
# Now we finished with Database modification and we start with the analysis

# Base Analysis

In [None]:
#df_Tikim_City show for every city the Socio-economic number and the number of cases per humans
df_Tikim_City = df5.groupBy('Settlement_Council').sum('TikimSum')
df_Tikim_City = df_Tikim_City.join(city_df, on='Settlement_Council', how='left')
df_Tikim_City = df_Tikim_City.join(df5.select('Settlement_Council','City_Cluster_2019').distinct(),on='Settlement_Council', how='left')
df_Tikim_City = df_Tikim_City.withColumn("Tikim by population",col("sum(TikimSum)")/ col("Population"))
df_Tikim_City = df_Tikim_City.sort("Tikim by population",ascending=False)
df_Tikim_City.select("Settlement_Council","City_Cluster_2019","Tikim by population").show(10)

In [None]:
# # If the cell below doesn't work, run this:
!python -m pip uninstall matplotlib
!pip install matplotlib==3.1.3

In [None]:
#show the distribution of 'number of cases per human'
sns.distplot(df_Tikim_City.toPandas()["Tikim by population"]);

In [None]:
# df_Tikim_District represent for any 'PoliceDistrict' the number of cases
df_Tikim_District = df5.groupBy('PoliceDistrict','Date Value').sum('TikimSum')

In [None]:
pd_Tikim_District = df_Tikim_District.toPandas()

In [None]:
# this graph show the distribution, by date, the number of case for any 'PoliceDistrict'
districts = pd.unique(pd_Tikim_District['PoliceDistrict'])
for d in districts:
  pd_district = pd_Tikim_District[pd_Tikim_District['PoliceDistrict'] == d]
  pd_district = pd_district.sort_values(by = 'Date Value')
  plt.plot(pd_district['Date Value'],pd_district['sum(TikimSum)'])
plt.legend(districts, loc="upper left")
plt.axvline(x=datetime.date(2020,4,8), color='black', linestyle='dashed')
plt.axvline(x=datetime.date(2021,3,31), color='black', linestyle='dashed')
plt.ylim([0,25000])
plt.show()

In [None]:
df_tikim_district = df5.groupBy('PoliceDistrict','StatisticCrimeGroup','Date Value').sum('TikimSum')

In [None]:
# Every graph below represents a district
# in every district we show the distribution, by date, the number of case for any 'StatisticCrimeGroup'
district = ["Southern District", "Judea and Samaria District", "Jerusalem District", "North District", "Beach district", "Central district", "Tel Aviv District"]
for i in range(7):
  pd_tikim_district = df_tikim_district.where(df_tikim_district.PoliceDistrict == district[i]).toPandas()
  types = pd.unique(pd_tikim_district['StatisticCrimeGroup'])
  types = np.delete(types, np.where(types == "Nan"))
  for t in types:
    pd_type = pd_tikim_district[pd_tikim_district['StatisticCrimeGroup'] == t]
    pd_type = pd_type.sort_values(by = 'Date Value')
    plt.plot(pd_type['Date Value'],pd_type['sum(TikimSum)'])
  plt.legend(types, loc = "upper left")
  plt.axvline(x=datetime.date(2020,4,8), color='black', linestyle='dashed')
  plt.axvline(x=datetime.date(2021,3,31), color='black', linestyle='dashed')
  plt.title(district[i])
  plt.show()

# Establishing "Crime Score"

In [None]:
crime_types = df5.select('StatisticCrimeGroup').distinct()
crime_types = crime_types.withColumn('Score',col('StatisticCrimeGroup'))
df5.select('StatisticCrimeGroup').distinct().show(17,False)

In [None]:
# We are define for every 'Crime Group' a score
crime_scores = {'The rest of the offenses':'1', 'Traffic violations': '2', 'Nan': '3', 'Permitted offenses': '1', 'Offenses against a person':'5', 'Confidence offenses': '3', 'Sex offenses': '5',\
                'Administrative offenses': '1', 'Economic offenses': '2', 'Public order offenses': '1', 'Offenses toward property': '3','Setup Sections': '1',\
                'Fraudulent offenses': '3','Offenses against body': '5','Offenses toward morality': '4','Unknown': '3', 'The rest of the rest': '1'}
crime_types_score = crime_types.replace(crime_scores,subset=['Score'])
crime_types_score.sort('Score',ascending=False).show(17,False)


$P = $ the population number of the city

$n_i = $ number of specific crime group cases

$S_i = $ the score of the specific crime group

$$CrimeScore = \frac{1}{P}\sum_{i}{n_{i}\cdot S_{i}}$$

In [None]:
df6 = df5.join(crime_types_score, on='StatisticCrimeGroup', how='left')
df6 = df6.withColumn("Total_Score",col("TikimSum") * col("Score"))
df6 = df6.groupBy('Settlement_Council').sum('Total_Score')
df_score_city = df5.join(df6, on='Settlement_Council', how='left')
df_score_city = df_score_city.withColumn("Score per population",col("sum(Total_Score)") / col("Population"))

In [None]:
# 'df_score_city' show for every city the 'Score per population'
# The higher the score, the higher the level of crime in the city
 
df_score_city.select("Settlement_Council",'City_Cluster_2019',"Score per population").distinct().sort("Score per population",ascending=False).show(10)

In [None]:
# this graph show the distribution, by Socio-economic, the 'Score per population' per every city. in addition we try to see if there is some correlation
pd_score = df_score_city.select('Settlement_Council','City_Cluster_2019','Score per population').distinct().toPandas()
sns.regplot(pd_score['City_Cluster_2019'],pd_score['Score per population'],ci=0,line_kws={"color": "red"})

In [None]:
#show the distribution of 'Score per population'
sns.distplot(pd_score['Score per population']);

$N = $ overall number of cases in the city 

$n_i = $ number of specific crime group cases

$S_i = $ the score of the specific crime group

$$CrimeScore = \frac{1}{N}\sum_{i}{n_{i}\cdot S_{i}}$$

In [None]:
# We use another formula for the crime score
df7 = df5.join(crime_types_score, on='StatisticCrimeGroup', how='left')
df7 = df7.withColumn("Total_Score",col("TikimSum") * col("Score"))
df8 = df7.groupBy('Settlement_Council').sum('Total_Score')
df9 = df7.groupBy('Settlement_Council').sum('TikimSum')
df_score_city2 = df9.join(df8, on='Settlement_Council', how='left')
df_score_city2 = df_score_city2.withColumn("Score per population2",col("sum(Total_Score)") / col("sum(TikimSum)"))

In [None]:
# 'df_score_city2' show for every city the 'Score per population'
# The higher the score, the higher the level of crime in the city
df_score_city2.select("Settlement_Council","Score per population2").distinct().sort("Score per population2",ascending=False).show(10)

In [None]:
df_score_city2 = df_score_city2.join(df5.select('Settlement_Council','City_Cluster_2019'), on='Settlement_Council', how='left')

In [None]:
# this graph show the distribution, by Socio-economic, the 'Score per population2' per every city. in addition we try to see if there is some correlation
pd_score2 = df_score_city2.select('Settlement_Council','City_Cluster_2019','Score per population2').distinct().toPandas()
sns.regplot(pd_score2['City_Cluster_2019'],pd_score2['Score per population2'],ci=0,line_kws={"color": "red"})

$P = $ the population number of the city

$N = $ overall number of cases in the city 

$n_i = $ number of specific crime group cases

$S_i = $ the score of the specific crime group

$$CrimeScore = \frac{1}{2} \cdot (\frac{1}{P}\sum_{i}{n_{i}\cdot S_{i}} + \frac{1}{N}\sum_{i}{n_{i}\cdot S_{i}})$$


In [None]:
# Here we average the two previous formulas into one and show graph as before
dft1 = df_score_city.select("Settlement_Council","Score per population",'City_Cluster_2019').distinct()
dft2 = df_score_city2.select("Settlement_Council","Score per population2").distinct()
df_score_city3 = dft1.join(dft2, on="Settlement_Council", how="left")
df_score_city3 = df_score_city3.withColumn("Average Crime Score",(col("Score per population") + col("Score per population2"))/2)
pd_score3 = df_score_city3.select('Settlement_Council', 'City_Cluster_2019','Average Crime Score').distinct().toPandas()
sns.regplot(pd_score3['City_Cluster_2019'],pd_score3['Average Crime Score'],ci=0,line_kws={"color": "red"})

In [None]:
sns.distplot(pd_score3['Average Crime Score']);

In [None]:
# 'df_score_city3' show for every city the 'Score per population'
# The higher the score, the higher the level of crime in the city
df_score_city3.select("Settlement_Council","City_Cluster_2019","Average Crime Score").distinct().sort("Average Crime Score",ascending=False).show(10)

# Deeper analysis and prediction

In [None]:
df5.show()

In [None]:
# Building data that will fit prediction over time.
# Each city is represented in one row
years = ['2017', '2018', '2019', '2020', '2021', '2022']
quarters = ['Q1', 'Q2', 'Q3', 'Q4']
df_ml = df5.select("Settlement_Council","Population","City_Cluster_2019").distinct().dropna()
for year in years:
  for q in quarters:
    if year == '2022' and q == 'Q4': # Skipping because there is no data for this time
      break
    year_q = year+"_"+q
    df_by_time = df5.groupBy('Settlement_Council','year','Q').sum('TikimSum')
    df_by_time = df_by_time.withColumnRenamed('sum(TikimSum)',year_q)
    df_temp = df_by_time.where(df_by_time.year == year)
    df_temp = df_temp.where(df_by_time.Q == q)
    df_ml = df_ml.join(df_temp.select('Settlement_Council',year_q),on='Settlement_Council', how='left')

In [None]:
df_ml.show(5)

In [None]:
training_columns = df_ml.columns[3:-1] # Removing the first three columns and the last quarter of 2022 ("2022-Q3") from the training columns

In [None]:
# Create a feature vector by combining the columns
df_ml = df_ml.dropna()
vector_assembler = VectorAssembler(inputCols=training_columns, outputCol="features", handleInvalid="keep")
df_ml_vec = vector_assembler.transform(df_ml)

In [None]:
# Split the data into training and testing sets
df_ml_data = df_ml_vec.select("Settlement_Council","features","2022_Q3")
training_data, testing_data = df_ml_data.randomSplit([0.8, 0.2])

In [None]:
# Train the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="2022_Q3")
model = lr.fit(training_data)

In [None]:
# Use the model to make predictions on the testing data
predictions = model.transform(testing_data)
predictions.show(5)

In [None]:
# Show the accuracy of the prediction by 'r2 metric value'
reg_eval = RegressionEvaluator(predictionCol='prediction', labelCol='2022_Q3', metricName='r2')
reg_eval.evaluate(predictions)

In [None]:
df_2023 = df_ml.dropna()
training_columns = df_ml.columns[4:] #  Now we will look at data from one quarter later to adjust the model.
vector_assembler_2023 = VectorAssembler(inputCols=training_columns, outputCol="features" ,handleInvalid="keep")
df_2023 = vector_assembler_2023.transform(df_2023).select("Settlement_Council","features","2022_Q3")
predictions_2023 = model.transform(df_2023)

In [None]:
predictions_2023 = predictions_2023.withColumn("Crime Increase",(col("prediction")-col("2022_Q3"))/col("2022_Q3"))

In [None]:
predictions_2023 = predictions_2023.join(df5.select("Settlement_Council","City_Cluster_2019").distinct(), on="Settlement_Council", how="left")

In [None]:
print("The cities with the predicted crime increase for 2023:")
predictions_2023.sort("Crime Increase",ascending=False).select("Settlement_Council","City_Cluster_2019","Crime Increase").show()

In [None]:
print("The cities with the predicted crime decrease for 2023:")
predictions_2023.sort("Crime Increase",ascending=True).select("Settlement_Council","City_Cluster_2019","Crime Increase").show()