In [188]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Advanced_DF_EX1")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
# load data
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

trees_gsc_file_path = 'gs://data_de2024_708179/new_york_tree_census_2015.csv'
parties_gsc_file_path = 'gs://data_de2024_708179/party_in_nyc.csv'
barlocations_gsc_file_path = 'gs://data_de2024_708179/bar_locations.csv'
pop_gsc_file_path = 'gs://data_de2024_708179/Population_by_Borough_NYC.csv'
crime_gsc_file_path = 'gs://data_de2024_708179/NYPD_Complaint_Data_Historic.csv'

In [189]:
#define schemas for tables and upload them into spark dataframes
pop_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(pop_gsc_file_path)
pop_df.printSchema()
#pop_df.show(2)

trees_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(trees_gsc_file_path)
trees_df.printSchema()
#trees_df.show(2)

parties_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(parties_gsc_file_path)
parties_df.printSchema()
#parties_df.show(2)

bars_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(barlocations_gsc_file_path)
bars_df.printSchema()
#bars_df.show(2)

crime_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(crime_gsc_file_path)
crime_df.printSchema()
#crime_df.show(2)

root
 |-- Age Group: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- 1950: string (nullable = true)
 |-- 1950 - Boro share of NYC total: string (nullable = true)
 |-- 1960: string (nullable = true)
 |-- 1960 - Boro share of NYC total: string (nullable = true)
 |-- 1970: string (nullable = true)
 |-- 1970 - Boro share of NYC total: string (nullable = true)
 |-- 1980: string (nullable = true)
 |-- 1980 - Boro share of NYC total: string (nullable = true)
 |-- 1990: string (nullable = true)
 |-- 1990 - Boro share of NYC total: string (nullable = true)
 |-- 2000: string (nullable = true)
 |-- 2000 - Boro share of NYC total: string (nullable = true)
 |-- 2010: string (nullable = true)
 |-- 2010 - Boro share of NYC total: string (nullable = true)
 |-- 2020: string (nullable = true)
 |-- 2020 - Boro share of NYC total: string (nullable = true)
 |-- 2030: string (nullable = true)
 |-- 2030 - Boro share of NYC total: string (nullable = true)
 |-- 2040: string (nullable = tru

In [190]:
from pyspark.sql.functions import col, max, min, regexp_replace, upper, count, trim, rank

pop_df1 = pop_df.select(col('Borough'), col('2020'))
pop_df1 = pop_df1.withColumn('population_int', regexp_replace(col('`2020`'), ',', '')\
                             .cast('int'))\
                             .filter(col('Borough') != 'NYC Total')\
                             .withColumn('Borough CAPS', trim(upper(col('Borough'))))

pop_df1 = pop_df1.select(col('Borough CAPS').alias('Borough'), col('population_int').alias('Population'))

In [191]:
from pyspark.sql import Window
boroughWindow = Window.partitionBy('Borough')

trees_df1 = trees_df.select(col('tree_id'), col('boroname'))
trees_df1 = trees_df1.withColumn('Borough', upper(col('boroname')))

trees_final = trees_df1.groupBy('Borough')\
    .agg(count('tree_id').alias('nr_trees'))\
    .join(pop_df1, on='Borough', how='inner').\
    withColumn('trees per 1000', (col('nr_trees')/col('Population'))*1000)
trees_final.show()

+-------------+--------+----------+------------------+
|      Borough|nr_trees|Population|    trees per 1000|
+-------------+--------+----------+------------------+
|       QUEENS|  250551|   2330295|107.51900510450393|
|     BROOKLYN|  177293|   2648452| 66.94212317232859|
|        BRONX|   85203|   1446788| 58.89114369209587|
|    MANHATTAN|   65423|   1638281| 39.93393074814394|
|STATEN ISLAND|  105318|    487155|216.18991901961388|
+-------------+--------+----------+------------------+



In [192]:
window_parties = Window.orderBy(col('nr parties').desc())

final_parties_df = parties_df.groupBy('Borough')\
    .agg(count('*').alias('nr parties'))\
    .filter(col('Borough') != 'Unspecified')\
    .withColumn('most active nightlife' , rank().over(window_parties))\
    .select('Borough', 'most active nightlife')

In [198]:
final_crime_df = crime_df.groupBy('BORO_NM')\
                    .agg(count('*').alias('nr crimes'))\
                    .join(pop_df1, crime_df.BORO_NM == pop_df1.Borough, "inner")\
                    .withColumn('crimes per 100,000 residents', ((col('nr crimes')/col('Population'))*100000))\
                    .orderBy(col('crimes per 100,000 residents'))\
                    .select(
                        col('BORO_NM').alias('Borough'),
                        col('crimes per 100,000 residents').alias('Crime per 100000 residents'))

final_crime_df.show()
    


+-------------+--------------------------+
|      Borough|Crime per 100000 residents|
+-------------+--------------------------+
|       QUEENS|         9095.758262365924|
|STATEN ISLAND|        10005.645020578666|
|     BROOKLYN|        11918.207315065556|
|    MANHATTAN|        14939.378531521761|
|        BRONX|        15722.897895199572|
+-------------+--------------------------+



In [201]:
final_df = ( final_crime_df.join(final_parties_df, on='Borough', how='inner')\
            .join(trees_final, on='Borough', how='inner'))\
            .drop('Population', 'nr_trees')\
            .withColumnRenamed("Borough", "borough")\
            .withColumnRenamed("Crime per 100000 residents", "crime_per_100000")\
            .withColumnRenamed("most active nightlife", "rank_nightlife")\
            .withColumnRenamed("trees per 1000", "trees_per_1000")\

final_df.show()

        

+-------------+------------------+--------------+------------------+
|      borough|  crime_per_100000|rank_nightlife|    trees_per_1000|
+-------------+------------------+--------------+------------------+
|        BRONX|15722.897895199572|             3| 58.89114369209587|
|     BROOKLYN|11918.207315065556|             1| 66.94212317232859|
|    MANHATTAN|14939.378531521761|             2| 39.93393074814394|
|       QUEENS| 9095.758262365924|             4|107.51900510450393|
|STATEN ISLAND|10005.645020578666|             5|216.18991901961388|
+-------------+------------------+--------------+------------------+



In [202]:
from google.cloud import bigquery

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "de_jads_temp_708179"  # use your bucket 
spark.conf.set('temporaryGcsBucket', bucket)
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
# Saving the data to BigQuery
final_df.write.format('bigquery') \
  .option('table', 'de2024-435320.assignment_2.boroughinfo') \
  .mode("overwrite") \
  .save()

In [187]:
spark.stop()