#### Useful Links 
- Spark History Server : http://83.212.73.248:18080/
- Hadoop YARN (scheduler) : http://83.212.73.248:8088/cluster
- HDFS : http://83.212.73.248:9870/dfshealth.html#tab-overview

#### Useful Commands : 
- Connect to okeanos-master (from local) : `$ ssh user@snf-40202.ok-kno.grnetcloud.net `
    - Password : 'Rand0m'
- Connect to okeanos-worker (from okeanos-master) : `$ ssh okeanos-worker`
- Open Jupyter Notebook : `$ jupyter notebook --ip 83.212.73.248 --port 8888`

#### Thinks to do :
- Make the data Csv to Parquet
- Make those columns the type we want
- Write the Queries (!)
- Benchmark and optimize them etc.
- Balance the data onto HDFS across the two datanodes

### Full HDFS path is here : hdfs://okeanos-master:54310/csv_data/
and contains :  
     
     1.  hdfs://okeanos-master:54310/csv_data/LAPD_Police_Stations.csv
     2.  hdfs://okeanos-master:54310/csv_data/crime_data_2019.csv 
     3.  hdfs://okeanos-master:54310/csv_data/crime_data_2023.csv
     4.  hdfs://okeanos-master:54310/csv_data/revgecoding.csv 
     5.  hdfs://okeanos-master:54310/csv_data/income/
         1. LA_income_2015.csv
         2. LA_income_2017.csv
         3. LA_income_2019.csv
         4. LA_income_2021.csv

In [1]:
# Pyspark Imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import geopy.distance
import time 
import math

In [2]:
# initialize sparkSession, make the data from csv to parquet,
spark = SparkSession \
    .builder \
    .appName("2 Executors") \
    .config("spark.driver.cores", "1") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "1g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/08 20:46:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/08 20:46:58 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
# load data into memory, do the necessary joins etc. here
crime_data = spark.read.parquet("hdfs://okeanos-master:54310/parquet/crime_data_*.parquet")
revge = spark.read.parquet("hdfs://okeanos-master:54310/parquet/revgecoding.parquet")
# only 2015 income data needed
income = spark.read \
            .parquet("hdfs://okeanos-master:54310/parquet/income/LA_income_2015.parquet")
lapd_stations = spark.read.parquet("hdfs://okeanos-master:54310/parquet/LAPD_Police_Stations.parquet")

                                                                                

In [4]:
crime_data = crime_data.withColumn("Date Rptd", to_timestamp("Date Rptd", 'MM/dd/yyyy hh:mm:ss a')) \
    .withColumn("DATE OCC", to_timestamp("DATE OCC", 'MM/dd/yyyy hh:mm:ss a')) \
    .withColumn("Vict Age", col("Vict Age").cast("int")) \
    .withColumn("LAT", col("LAT").cast("double")) \
    .withColumn("LON", col("LON").cast("double")) \
    .withColumn("Premis_Desc", col("Premis Desc"))

In [5]:
# calculate distance on a sphere (as earth is not flat)
def haversine(lon1, lat1, lon2, lat2):
    R = 6371
    dLat = math.radians(lat2 - lat1)
    dLon = math.radians(lon2 - lon1)
    a = math.sin(dLat / 2) * math.sin(dLat / 2) + \
        math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dLon / 2) * math.sin(dLon / 2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = R * c
    return distance

haversine_udf = udf(haversine, DoubleType())

In [6]:
def query_4_2a():
    start_time = time.time()

    crime_data_filtered = crime_data.withColumn('Weapon Used Cd', col('Weapon Used Cd').cast('int')) \
                                    .filter(col('Weapon Used Cd') < 200) \
                                    .withColumn('year', F.year('Date Rptd'))
    crime_data_filtered.count()
    combined_data = crime_data_filtered.hint('BROADCAST').crossJoin(broadcast(lapd_stations))

    combined_data = combined_data.withColumn("closest_distance", haversine_udf(col("LON"), col("LAT"), col("X"), col("Y")))
    
    windowSpec = Window.partitionBy("DR_NO").orderBy("closest_distance")
    closest_stations = combined_data.withColumn("rank", rank().over(windowSpec)).filter(col("rank") == 1)
    
    final_data = closest_stations.select(col("DR_NO"), col("DIVISION").alias("closest_station"), col("closest_distance"))
    
    joined_with_stations = crime_data_filtered.join(final_data, "DR_NO")
    
    result = joined_with_stations.groupBy('year') \
        .agg((F.sum('closest_distance') / F.count('*')).alias('average_distance'),
            F.count('*').alias('#')) \
       .orderBy(F.col('year'))

    
    result.show()
    end_time = time.time()
    print(f'Method : Broadcast | Time {end_time - start_time}')
    return end_time - start_time

In [7]:
query_4_2a()

In [8]:
# 2b)
def query_4_2b(method = 'CONTINUE'):
    start_time = time.time()

    crime_data_filtered = crime_data.withColumn('Weapon Used Cd', col('Weapon Used Cd').cast('int')) \
                                  .filter(F.col('Weapon Used Cd').isNotNull()) \
                                  .withColumn('year', F.year('Date Rptd'))
    
    if method == 'BROADCAST':
        combined_data = crime_data_filtered.hint(method).crossJoin(broadcast(lapd_stations))
        combined_data.explain()
        
    elif method in ['MERGE', 'SHUFFLE_HASH', 'SHUFFLE_REPLICATE_NL']:
        combined_data = crime_data.hint(method).crossJoin(lapd_stations)
        combined_data.explain()
        
    elif method == 'CONTINUE':
        combined_data = crime_data.crossJoin(lapd_stations)
    else:
        return None

    
    combined_data = combined_data.withColumn("closest_distance", haversine_udf(col("LON"), col("LAT"), col("X"), col("Y")))
    
    windowSpec = Window.partitionBy("DR_NO").orderBy("closest_distance")
    closest_stations = combined_data.withColumn("rank", rank().over(windowSpec)).filter(col("rank") == 1)
    
    final_data = closest_stations.select(col("DR_NO"), col("DIVISION").alias("closest_station"), col("closest_distance"))
    
    joined_result = crime_data_filtered.join(final_data, "DR_NO")
   # crime_data_join_stations = result.withColumn('Weapon Used Cd', col('Weapon Used Cd').cast('int')) \
   #                               .filter(F.col('Weapon Used Cd').isNotNull()) \
   #                                 .withColumn('year', F.year('Date Rptd'))
    
    result = joined_result.groupBy('AREA NAME') \
        .agg(
            (F.sum('closest_distance') / F.count('*')).alias('average_distance'),
            F.count('*').alias('#')
        ) \
        .orderBy(F.col('#').desc()) \
        .withColumnRenamed('AREA NAME', 'division')
    
    result.show()
    end_time = time.time()
    print(f'Method : {method} | Time {end_time - start_time}')
    return end_time - start_time

In [9]:
query_4_2b('BROADCAST')

24/01/08 20:47:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, Cross
   :- Project [DR_NO#0, Date Rptd#80, DATE OCC#110, TIME OCC#3, AREA #4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 6 more fields]
   :  +- Project [DR_NO#0, gettimestamp(Date Rptd#1, MM/dd/yyyy hh:mm:ss a, TimestampType, Some(Europe/Athens), false) AS Date Rptd#80, gettimestamp(DATE OCC#2, MM/dd/yyyy hh:mm:ss a, TimestampType, Some(Europe/Athens), false) AS DATE OCC#110, TIME OCC#3, AREA #4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fie

                                                                                

+-----------+------------------+-----+
|   division|  average_distance|    #|
+-----------+------------------+-----+
|77th Street|  12.5671222967951|94853|
|  Southeast| 25.60129873879751|87905|
|  Southwest|   9.3291866625766|72814|
|    Central|23.269811961478677|63606|
|     Newton|13.433184721292792|61408|
|    Olympic|36.348197450528986|60925|
|    Rampart|19.629568008429306|55881|
|  Hollywood|27.671748021149362|51255|
|    Mission|34.800525588613944|48956|
|    Pacific| 24.82546195967402|43019|
|   Foothill|29.797575113580432|41625|
| Hollenbeck| 19.52127635437576|41540|
|N Hollywood|17.691483131562677|41151|
|     Harbor| 13.98202798940977|40854|
|    Topanga| 6.096105827278326|39337|
|   Wilshire|15.736618712905988|37930|
|  Northeast|12.367411075059353|37334|
| Devonshire|23.046036251700613|36902|
|   Van Nuys| 19.78805706517812|36264|
|West Valley|14.718040243069364|34005|
+-----------+------------------+-----+
only showing top 20 rows

Method : BROADCAST | Time 47.336594343

47.336594343185425

In [10]:
# 2b)
def query_4_2b_new(method = 'CONTINUE'):
    start_time = time.time()
    
    if method == 'BROADCAST':
        combined_data = crime_data.hint(method).crossJoin(broadcast(lapd_stations))
        combined_data.explain()
        
    elif method in ['MERGE', 'SHUFFLE_HASH', 'SHUFFLE_REPLICATE_NL']:
        combined_data = crime_data.hint(method).crossJoin(lapd_stations)
        combined_data.explain()
        
    elif method == 'CONTINUE':
        combined_data = crime_data.crossJoin(lapd_stations)
    else:
        return None

    
    combined_data = combined_data.withColumn("closest_distance", haversine_udf(col("LON"), col("LAT"), col("X"), col("Y")))
    
    windowSpec = Window.partitionBy("DR_NO").orderBy("closest_distance")
    closest_stations = combined_data.withColumn("rank", rank().over(windowSpec)).filter(col("rank") == 1)
    
    final_data = closest_stations.select(col("DR_NO"), col("DIVISION").alias("closest_station"), col("closest_distance"))
    
    result = crime_data.join(final_data, "DR_NO")
    crime_data_join_stations = result.withColumn('Weapon Used Cd', col('Weapon Used Cd').cast('int')) \
                                    .filter(F.col('Weapon Used Cd').isNotNull()) \
                                    .withColumn('year', F.year('Date Rptd'))
    
    result = crime_data_join_stations.groupBy('AREA NAME') \
        .agg(
            (F.sum('closest_distance') / F.count('*')).alias('average_distance'),
            F.count('*').alias('#')
        ) \
        .orderBy(F.col('#').desc()) \
        .withColumnRenamed('AREA NAME', 'division')
    
    result.show()
    end_time = time.time()
    print(f'Method : {method} | Time {end_time - start_time}')
    return end_time - start_time

In [11]:
 query_4_2b_new('BROADCAST')

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, Cross
   :- Project [DR_NO#0, gettimestamp(Date Rptd#1, MM/dd/yyyy hh:mm:ss a, TimestampType, Some(Europe/Athens), false) AS Date Rptd#80, gettimestamp(DATE OCC#2, MM/dd/yyyy hh:mm:ss a, TimestampType, Some(Europe/Athens), false) AS DATE OCC#110, TIME OCC#3, AREA #4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fields]
   :  +- FileScan parquet [DR_NO#0,Date Rptd#1,DATE OCC#2,TIME OCC#3,AREA #4,AREA NAME#5,Rpt Dist No#6,Part 1-2#7,Crm Cd#8,Crm Cd Desc#9,Mocodes#10,Vict Age#11,Vict Sex#12,Vict Descent#13,Premis Cd#14,Premis Desc#15,Weapon Used Cd#16,Weapon Desc#17,Status#18,Status Desc#19,Crm Cd 1#20,Crm Cd 2#21,Crm Cd 3#22,Crm Cd 4#23,... 4 more fields] Batched: true



+-----------+------------------+-----+
|   division|  average_distance|    #|
+-----------+------------------+-----+
|77th Street|12.567122296795125|94853|
|  Southeast| 25.60129873879752|87905|
|  Southwest| 9.329186662576602|72814|
|    Central|23.269811961478677|63606|
|     Newton|13.433184721292793|61408|
|    Olympic|   36.348197450529|60925|
|    Rampart|19.629568008429306|55881|
|  Hollywood|27.671748021149348|51255|
|    Mission| 34.80052558861396|48956|
|    Pacific| 24.82546195967405|43019|
|   Foothill|29.797575113580425|41625|
| Hollenbeck| 19.52127635437576|41540|
|N Hollywood| 17.69148313156268|41151|
|     Harbor|13.982027989409763|40854|
|    Topanga| 6.096105827278324|39337|
|   Wilshire|15.736618712905988|37930|
|  Northeast|12.367411075059353|37334|
| Devonshire|23.046036251700617|36902|
|   Van Nuys|19.788057065178123|36264|
|West Valley| 14.71804024306935|34005|
+-----------+------------------+-----+
only showing top 20 rows

Method : BROADCAST | Time 97.072289228

                                                                                

97.07228922843933

In [9]:
start_time = time.time()

lapd_stations_new = lapd_stations.withColumnRenamed('PREC','AREA')

print(lapd_stations_new.count())

print(crime_data.count())

crime_data_filtered = crime_data.withColumn('Weapon Used Cd', col('Weapon Used Cd').cast('int')) \
                                  .filter(col('Weapon Used Cd') < 200) 
print(crime_data_filtered.count())

#combined_data = crime_data_filtered.hint('BROADCAST').crossJoin(broadcast(lapd_stations))
crime_data_join_stations = crime_data_filtered.withColumnRenamed('AREA ', 'AREA') \
                                    .join(broadcast(lapd_stations_new), 'AREA', 'inner')

print(crime_data_join_stations.count())
end_time = time.time()
print(end_time - start_time)

21
2993433
109319
72140
2.5798370838165283


In [None]:
crime_data.count()

In [None]:
spark.conf.get("spark.executor.instances")

In [None]:
#crime_data.show()

In [None]:
#crime_data.printSchema()

#### Change Column Types 
Στην εκφωνηση λέει : Διατηρώντας τα αρχικά ονόματα στηλών 

εννοωντας οτι δεν μπορουμε να κανουμε το 'Date Rptd' -> 'Date_Rptd' ?
- Date Rptd: date
- DATE OCC: date
- Vict Age: integer
- LAT: double
- LON: double

In [None]:
# code for column type changing
crime_data = crime_data.withColumn("Date Rptd", to_timestamp("Date Rptd", 'MM/dd/yyyy hh:mm:ss a')) \
    .withColumn("DATE OCC", to_timestamp("DATE OCC", 'MM/dd/yyyy hh:mm:ss a')) \
    .withColumn("Vict Age", col("Vict Age").cast("int")) \
    .withColumn("LAT", col("LAT").cast("double")) \
    .withColumn("LON", col("LON").cast("double")) \
    .withColumn("Premis_Desc", col("Premis Desc"))

# 3rd Query :

        find the 3 zip codes with min and max household income
                    |
                    |
                    v
        // filter(remove) victimless crimes
                    |
                    |
                    v
        select vict_desc, COUNT(*) as count
        where year=2015
        group by vict_desc
        order by count DESC

In [None]:
# write code for 3rd query here
def query_3(method = 'CONTINUE'):
    start_time = time.time()
    #crime_data_2015 = crime_data.filter(year(col('Date Rptd')) == 2015)
    if method == 'BROADCAST':
        crime_data_join_revge = crime_data.join(broadcast(revge), ['LAT', 'LON'], 'inner') \
            .withColumnRenamed('ZIPcode', 'Zip Code') \
            .withColumn("Zip Code", col("Zip Code").cast("int")) \
            .filter((col('Vict Descent') != 'X') & (col('Vict Sex') != 'X'))
        #crime_data_join_revge.explain()
        
    elif method in ['MERGE', 'SHUFFLE_HASH', 'SHUFFLE_REPLICATE_NL']:
        crime_data_join_revge = crime_data.hint(method).join(revge, ['LAT', 'LON'], 'inner') \
            .withColumnRenamed('ZIPcode', 'Zip Code') \
            .withColumn("Zip Code", col("Zip Code").cast("int")) \
            .filter((col('Vict Descent') != 'X') & (col('Vict Sex') != 'X'))
        #crime_data_join_revge.explain()
        
    elif method == 'CONTINUE':
        crime_data_join_revge = crime_data.join(revge, ['LAT', 'LON'], 'inner') \
            .withColumnRenamed('ZIPcode', 'Zip Code') \
            .withColumn("Zip Code", col("Zip Code").cast("int")) \
            .filter((col('Vict Descent') != 'X') & (col('Vict Sex') != 'X'))
        
    else:
        return None

    #crime_data_join_revge = crime_data.join(revge, ['LAT', 'LON'], 'inner') \
    #    .withColumnRenamed('ZIPcode', 'Zip Code') \
    #    .withColumn("Zip Code", col("Zip Code").cast("int")) \
    #    .filter((col('Vict Descent') != 'X') & (col('Vict Sex') != 'X'))
    
    crime_data_join_income = crime_data_join_revge.join(income, 'Zip Code', 'inner') \
                                .withColumn('Estimated Median Income', 
                                            regexp_replace(col('Estimated Median Income'), '[$,]', '')) \
                                .withColumn('Estimated Median Income', 
                                            col('Estimated Median Income') \
                                .cast('double'))
    
    max_income_zip_codes = crime_data_join_income.groupBy('Zip Code') \
                            .agg({'Estimated Median Income': 'max'}) \
                            .withColumnRenamed('max(Estimated Median Income)', 'MaxIncome') \
                            .orderBy(col('MaxIncome').desc()) \
                            .limit(3)
    
    min_income_zip_codes = crime_data_join_income.groupBy('Zip Code') \
                            .agg({'Estimated Median Income': 'min'}) \
                            .withColumnRenamed('min(Estimated Median Income)', 'MinIncome') \
                            .orderBy(col('MinIncome')) \
                            .limit(3)
    
    zip_codes = min_income_zip_codes.union(max_income_zip_codes)
    
    zip_codes_list = [row['Zip Code'] for row in zip_codes.collect()]
    
    result = crime_data_join_income \
                .filter(col('Zip Code').isin(zip_codes_list)) \
                .filter(year(col('Date Rptd')) == 2015) \
                .groupBy('Vict Descent') \
                .count() \
                .withColumnRenamed('count', '#') \
                .orderBy(col('#').desc())
    
    result.show()
    
    end_time = time.time()
    
    num_executors = spark.conf.get("spark.executor.instances")
    print(f'{num_executors} Executors')
    
    result.explain()
    print(f'Method : {method} | Time {end_time - start_time}')
    
    return end_time - start_time

# Query 3 on 2 Executors

In [None]:
query_3()

In [None]:
spark.stop()