#### SPARK CONFIGURATION

In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.memory": "1g",
        "spark.executor.cores": "1",
        "spark.driver.memory": "2g"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3283,application_1732639283265_3239,pyspark,idle,Link,Link,,
3307,application_1732639283265_3263,pyspark,idle,Link,Link,,
3399,application_1732639283265_3355,pyspark,idle,Link,Link,,
3417,application_1732639283265_3373,pyspark,idle,Link,Link,,
3419,application_1732639283265_3375,pyspark,idle,Link,Link,,
3420,application_1732639283265_3376,pyspark,idle,Link,Link,,
3421,application_1732639283265_3377,pyspark,idle,Link,Link,,
3423,application_1732639283265_3379,pyspark,idle,Link,Link,,
3426,application_1732639283265_3382,pyspark,idle,Link,Link,,
3429,application_1732639283265_3385,pyspark,idle,Link,Link,,


In [2]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3515,application_1732639283265_3471,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 4
Executor Memory: 1g
Executor Cores: 1

#### QUERY 1: RDD API

In [5]:
from pyspark.sql import SparkSession
import time
import re
import csv  

# Initialize SparkContext
sc = SparkSession.builder.appName("Query 1").getOrCreate().sparkContext

# Start timing
start_time = time.time()

def parse_csv(line):
    return next(csv.reader([line]))

crime_data_10_to_19 = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv") \
    .map(parse_csv)
crime_data_20_to_present = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv") \
    .map(parse_csv)

# Filter out the header row
header = crime_data_10_to_19.first()  # Get the first row (header)
crime_data_10_to_19 = crime_data_10_to_19.filter(lambda row: row != header)
crime_data_20_to_present = crime_data_20_to_present.filter(lambda row: row != header)


# Filter for 'aggravated assault' in `Crm Cd Desc` using regex for case insensitivity
aggravated_assault_10_to_19 = crime_data_10_to_19.filter(
    lambda row: row[9] and re.search(r"aggravated assault", row[9].strip(), re.IGNORECASE)
)
aggravated_assault_20_to_present = crime_data_20_to_present.filter(
    lambda row: row[9] and re.search(r"aggravated assault", row[9].strip(), re.IGNORECASE)
)


# Combine both RDDs into one unified RDD
aggravated_assault_unified = aggravated_assault_10_to_19.union(aggravated_assault_20_to_present)

# Print total unified count
print(f"Unified aggravated assault count: {aggravated_assault_unified.count()}")

# Map age to target groups and count occurrences, adding checks for invalid values (negative or null age)
age_group_counts = aggravated_assault_unified \
    .filter(lambda row: row[11].isdigit() and int(row[11]) >= 0) \
    .map(lambda row: int(row[11])) \
    .map(lambda age: ("childs", 1) if age < 18 else  # Childs: age < 18
                     ("young adults", 1) if 18 <= age <= 24 else  # Young Adults: 18-24
                     ("adults", 1) if 25 <= age <= 64 else  # Adults: 25-64
                     ("elderly people", 1)) \
    .reduceByKey(lambda a, b: a + b)  # Count occurrences in each group

# Sort by count in descending order
sorted_age_groups = age_group_counts.sortBy(lambda x: x[1], ascending=False)

print(sorted_age_groups.collect())

# Count cases without age information (non-numeric or negative values in `Vict Age`)
missing_age_count = aggravated_assault_unified.filter(lambda row: not row[11].isdigit() or int(row[11]) < 0).count()
print(f"Count of aggravated assault records without age information: {missing_age_count}")

# Get the total number of elements in the unified RDD
total_elements = aggravated_assault_unified.count()

# Print the total number of elements
print(f"Total number of aggravated assault records: {total_elements}")

# End timing
end_time = time.time()
print(f"Execution time: {round(end_time - start_time, 3)} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unified aggravated assault count: 176611
[('adults', 121093), ('young adults', 33605), ('childs', 15923), ('elderly people', 5985)]
Count of aggravated assault records without age information: 5
Total number of aggravated assault records: 176611
Execution time: 34.283 seconds

#### QUERY 1: DATAFRAME

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, StringType
from pyspark.sql.functions import col, lower, when
import time

# Δημιουργία SparkSession
spark = SparkSession \
    .builder \
    .appName("DF query 1 execution") \
    .getOrCreate()

# Ορισμός Schema για το DataFrame
crime_data_schema = StructType([
    StructField("dr_no", IntegerType(), False),
    StructField("date_rptd", StringType(), True),
    StructField("date_occ", StringType(), True),
    StructField("time_occ", IntegerType(), True),
    StructField("area", IntegerType(), True),
    StructField("area_name", StringType(), True),
    StructField("rpt_dist_no", IntegerType(), True),
    StructField("part_1_2", IntegerType(), True),
    StructField("crm_cd", IntegerType(), True),
    StructField("crm_cd_desc", StringType(), True),
    StructField("mocodes", StringType(), True),
    StructField("vict_age", IntegerType(), True),
    StructField("vict_sex", StringType(), True),
    StructField("vict_descent", StringType(), True),
    StructField("premis_cd", IntegerType(), True),
    StructField("premis_desc", StringType(), True),
    StructField("weapon_used_cd", IntegerType(), True),
    StructField("weapon_desc", StringType(), True),
    StructField("status", StringType(), True),
    StructField("status_desc", StringType(), True),
    StructField("crm_cd_1", IntegerType(), True),
    StructField("crm_cd_2", IntegerType(), True),
    StructField("crm_cd_3", IntegerType(), True),
    StructField("crm_cd_4", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("cross_street", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True)
])

# Ξεκίνημα χρονόμετρου
start_time = time.time()

# Φόρτωση των δεδομένων με το schema
crime_data_10_to_19_df = spark.read.format('csv') \
    .options(header='true', dateFormat='03/01/2020 12:00:00 AM') \
    .schema(crime_data_schema) \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")

crime_data_20_to_present_df = spark.read.format('csv') \
    .options(header='true', dateFormat='MM/dd/yyyy hh:mm:ss am') \
    .schema(crime_data_schema) \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv")

# Συγχώνευση των δύο DataFrames
unified_crime_data_df = crime_data_10_to_19_df.union(crime_data_20_to_present_df)

# Φιλτράρισμα για "aggravated assault" στη στήλη crm_cd_desc (χωρίς ευαισθησία στην περίπτωση χαρακτήρων)
aggravated_assault_df = unified_crime_data_df.filter(
    lower(col("crm_cd_desc")).contains("aggravated assault")
)

# Φιλτράρισμα για ελλιπή ή μη έγκυρα δεδομένα στην ηλικία του θύματος (Vict Age)
invalid_age_df = aggravated_assault_df.filter(
    col("vict_age").isNull() | (col("vict_age") < 0)
)

# Προσθήκη στήλης "Age Group" με βάση την ηλικία του θύματος
aggravated_assault_with_age_group = aggravated_assault_df.withColumn(
    "Age Group",
    when(col("vict_age") < 18, "childs")
    .when((col("vict_age") >= 18) & (col("vict_age") <= 24), "young adults")
    .when((col("vict_age") >= 25) & (col("vict_age") <= 64), "adults")
    .when(col("vict_age") > 64, "elderly people")
)

# Ομαδοποίηση κατά Age Group και εμφάνιση των αποτελεσμάτων
age_group_counts = aggravated_assault_with_age_group.groupBy("Age Group").count().orderBy("count", ascending=False)

# Εμφάνιση των αποτελεσμάτων
age_group_counts.show()

# Μετρητής για ελλιπή δεδομένα στην ηλικία
missing_age_count = invalid_age_df.count()
print(f"Count of aggravated assault records without age information: {missing_age_count}")

# Τέλος χρονόμετρου
end_time = time.time()
print(f"Execution time: {round(end_time - start_time, 3)} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+------+
|     Age Group| count|
+--------------+------+
|        adults|121093|
|  young adults| 33605|
|        childs| 15928|
|elderly people|  5985|
+--------------+------+

Count of aggravated assault records without age information: 5
Execution time: 18.423 seconds