In [1]:
import findspark

findspark.init()

In [2]:
from pyspark.sql.functions import col, month, substring, max, avg, sum, count, min
from pyspark.sql import SparkSession
import os

In [3]:
spark = SparkSession.builder.appName("Jupyter Demo").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

23/12/20 16:26:36 WARN util.Utils: Your hostname, node1 resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
23/12/20 16:26:36 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/12/20 16:26:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Sales

In [4]:
def create_sales(years):
    year_one = years[0]
    df_big = spark.read.option("header",
                               "true").option("inferschema",
                                              "true").csv(f'hdfs://localhost:8020//user/bd_project/batch_layer/neighborhood_sales/{str(year_one)}.csv')
    years = years[1:]
    for year in years:
        df = spark.read.option("header",
                               "true").option("inferschema",
                                              "true").csv(f'hdfs://localhost:8020//user/bd_project/batch_layer/neighborhood_sales/{str(year)}.csv')
        df_big = df_big.union(df)
    return df_big

### Chiosen years

In [5]:
y = [2016, 2018, 2021]
df_sales = create_sales(y)

                                                                                

## 311 Service

In [6]:
def split_date_column(df):
    df2 = df.withColumn('year', 
                             col('created_date').substr(1, 4) ).withColumn('month', 
                                                                           col('created_date').substr(6, 2) ) 
    df_new = df2.withColumn('day', 
                                 col('created_date').substr(9,2) ).withColumn('hour', 
                                                                               col('created_date').substr(12, 2) ) 
    return df_new

def create_sevices(dates):
    year_one = dates[0]
    df_big = spark.read.option("header","true").option("inferschema","true").parquet(f'hdfs://localhost:8020/user/bd_project/batch_layer/311_service/{str(year_one)}.parquet')
    df_big = df_big.drop('location')
    cols = df_big.columns
    dates = dates[1:]
    for date in dates:
        df = spark.read.option("header","true").option("inferschema","true").parquet(f'hdfs://localhost:8020/user/bd_project/batch_layer/311_service/{str(date)}.parquet')
        df = df.drop('location')
        df = df.select(cols)
        df_big = df_big.union(df)
    return split_date_column(df_big)

### Chosen dates

In [7]:
dates = []
months = ['01', '06']
days = ['01']
for rok in y:
    for month in months:
        for day in days:
            dates.append(str(rok) + '/' + month + '/' + str(rok) +'-' + month + '-' + day)

In [9]:
df_services = create_sevices(dates)
df_services

DataFrame[unique_key: string, created_date: string, closed_date: string, agency: string, agency_name: string, complaint_type: string, descriptor: string, location_type: string, incident_zip: string, incident_address: string, street_name: string, cross_street_1: string, cross_street_2: string, address_type: string, city: string, facility_type: string, status: string, due_date: string, resolution_description: string, resolution_action_updated_date: string, community_board: string, bbl: string, borough: string, x_coordinate_state_plane: string, y_coordinate_state_plane: string, open_data_channel_type: string, park_facility_name: string, park_borough: string, latitude: string, longitude: string, intersection_street_1: string, intersection_street_2: string, taxi_company_borough: string, taxi_pick_up_location: string, bridge_highway_name: string, bridge_highway_direction: string, road_ramp: string, bridge_highway_segment: string, vehicle_type: string, landmark: string, year: string, month: s

## Views

In [53]:
def sales_measures(df):
    new = df.groupBy('borough').agg(sum('number_of_sales').alias('SalesNumber'),
                                             min('lowest_sale_price').alias('MinPrice'),
                                             avg('average_sale_price').alias('AvgPrice'),
                                             max('highest_sale_price').alias('MaxPrice'))
    view_name = "sales_measures"
    new.createOrReplaceTempView(view_name)
    return new, view_name

def sales_measures_by_neighborhood(df):
    new = df.groupBy('year', 'borough', 'neighborhood').agg(sum('number_of_sales').alias('SalesNumber'),
                                             min('lowest_sale_price').alias('MinPrice'),
                                             avg('average_sale_price').alias('AvgPrice'),
                                             max('highest_sale_price').alias('MaxPrice'))
    view_name = "sales_measures_by_n"
    new.createOrReplaceTempView(view_name)
    return new.orderBy('borough', 'year'), view_name



def get_popular_incidents(df):
    grouped_df = df.groupBy('borough', 'complaint_type')

    # Calculate the count of each value in the grouped DataFrame
    count_df = grouped_df.agg(count('complaint_type').alias('ComplaintCount'))

    # Find the mode by ordering in descending order based on the count and selecting the first row
    mode_df = count_df.groupBy('borough').agg(max('ComplaintCount').alias('ComplaintCount'))
    joined = mode_df.join(count_df, on=['borough', 'ComplaintCount'], how='inner')
    view_name = "popular_incidents"
    joined.createOrReplaceTempView(view_name)
    return joined.orderBy('ComplaintCount',  ascending=False), view_name

def get_incidents_by_borough(df):
    grouped_df = df.groupBy('borough')

    count_df = grouped_df.agg(count('complaint_type').alias('ComplaintCount'))
    view_name = "incidents_by_borough"
    count_df.createOrReplaceTempView(view_name)
    return count_df, view_name

def get_incidents_by_borough_time(df):
    grouped_df = df.groupBy('borough', 'year', 'month', 'day')

    count_df = grouped_df.agg(count('complaint_type').alias('ComplaintCount'))
    view_name = "incidents_by_borough_time"
    count_df.createOrReplaceTempView(view_name)
    return count_df, view_name


def get_incidents_by_type(df):
    grouped_df = df.groupBy('borough', 'location_type')

    # Calculate the count of each value in the grouped DataFrame
    count_df = grouped_df.agg(count('complaint_type').alias('ComplaintCount'))
    view_name = "incidents_by_type"
    count_df.createOrReplaceTempView(view_name)
    return count_df.orderBy('ComplaintCount', ascending=False), view_name




def get_incidents_by_channel_type(df):
    grouped_df = df.groupBy('year', 'open_data_channel_type')

    count_df = grouped_df.agg(count('complaint_type').alias('ComplaintCount'))
    view_name = "incidents_by_channel_type"
    count_df.createOrReplaceTempView(view_name)
    return count_df.orderBy('ComplaintCount', ascending=False), view_name


def get_incidents_by_street(df, hours):
    df = df.withColumn("Hour", col("hour").cast("int"))
    df = df.filter(df['street_name'] != 'N/A')
    df = df.filter( (df['hour'] >= hours[0]) & (df["hour"] <= hours[1]) )
    grouped_df = df.groupBy('borough', 'street_name')

    # Calculate the count of each value in the grouped DataFrame
    count_df = grouped_df.agg(count('complaint_type').alias('ComplaintCount'))
    view_name = "incidents_by_street"
    count_df.createOrReplaceTempView("incidents_by_street")
    return count_df.orderBy('ComplaintCount', ascending=False), view_name

## 10 neighborhoods with most houses sold in 2021

In [65]:
year = 2021
_, name = sales_measures_by_neighborhood(df_sales)
view_df = spark.sql(f'''
    SELECT neighborhood, borough, SalesNumber, MinPrice, ROUND(AvgPrice, 0) as AvgPrice, MaxPrice
    FROM {name}
    WHERE year = {year}
    ORDER BY SalesNumber DESC
    LIMIT 10
    ''')
view_df.show()



+------------------+-------------+-----------+--------+---------+---------+
|      neighborhood|      borough|SalesNumber|MinPrice| AvgPrice| MaxPrice|
+------------------+-------------+-----------+--------+---------+---------+
|    FLUSHING-NORTH|       QUEENS|        767|  200000|1249121.0|6600000.0|
|       GREAT KILLS|STATEN ISLAND|        636|  200000| 700240.0|1400000.0|
|           BAYSIDE|       QUEENS|        528|  200000|1152375.0|2460000.0|
|        ST. ALBANS|       QUEENS|        412|  211975| 624832.0|1374243.0|
|      BOROUGH PARK|     BROOKLYN|        401|  200000|1448466.0|3700000.0|
|BEDFORD STUYVESANT|     BROOKLYN|        398|  230000|1558251.0|4550000.0|
|    QUEENS VILLAGE|       QUEENS|        387|  200000| 746159.0|1400000.0|
|     RICHMOND HILL|       QUEENS|        381|  247500| 818856.0|1820000.0|
|    MIDDLE VILLAGE|       QUEENS|        364|  267142| 964018.0|2700000.0|
|     EAST NEW YORK|     BROOKLYN|        356|  260000| 701756.0|1347775.0|
+-----------

                                                                                

## 10 neighborhoods with lowest average price of house in 2021

In [66]:
year = 2021
_, name = sales_measures_by_neighborhood(df_sales)
view_df = spark.sql(f'''
    SELECT neighborhood, borough, ROUND(AvgPrice, 0) as AvgPrice, SalesNumber
    FROM {name}
    WHERE year = {year}
    ORDER BY AvgPrice
    LIMIT 10
    ''')
view_df.show()



+--------------------+-------------+--------+-----------+
|        neighborhood|      borough|AvgPrice|SalesNumber|
+--------------------+-------------+--------+-----------+
|          PORT IVORY|STATEN ISLAND|457652.0|         16|
|       BROAD CHANNEL|       QUEENS|460297.0|         35|
|         HUNTS POINT|        BRONX|468387.0|          8|
|DONGAN HILLS-OLD ...|STATEN ISLAND|479000.0|          3|
|   CONCORD-FOX HILLS|STATEN ISLAND|496194.0|         36|
|     MARINERS HARBOR|STATEN ISLAND|501928.0|        153|
|             CONCORD|STATEN ISLAND|515184.0|         61|
|       PORT RICHMOND|STATEN ISLAND|516867.0|        155|
|       MIDLAND BEACH|STATEN ISLAND|533400.0|        199|
|             ARVERNE|       QUEENS|572021.0|        117|
+--------------------+-------------+--------+-----------+



                                                                                

## 10 most popular types of locations of incidents

In [47]:
_, name = get_incidents_by_type(df_services)
view_df = spark.sql(f'''
    SELECT location_type, SUM(ComplaintCount) as NumberOfComplaints
    FROM {name}
    WHERE location_type != 'N/A'
    GROUP BY location_type
    ORDER BY NumberOfComplaints DESC
    LIMIT 10
    ''')
view_df.show()



+--------------------+------------------+
|       location_type|NumberOfComplaints|
+--------------------+------------------+
|RESIDENTIAL BUILDING|             96923|
|     Street/Sidewalk|             61402|
|            Sidewalk|             33508|
|Residential Build...|             33300|
|              Street|             23756|
|    Store/Commercial|              5101|
|                Park|              2864|
| Club/Bar/Restaurant|              2317|
|3+ Family Apt. Bu...|              2011|
|               Other|              1585|
+--------------------+------------------+



                                                                                

## Number of complaints by channel type in selected years

In [48]:
_, name = get_incidents_by_channel_type(df_services)
view_df = spark.sql(f'''
    SELECT *
    FROM {name}
    WHERE  open_data_channel_type != 'UNKNOWN'
    ORDER BY open_data_channel_type,  year
    ''') 
view_df.show()



+----+----------------------+--------------+
|year|open_data_channel_type|ComplaintCount|
+----+----------------------+--------------+
|2016|                MOBILE|          9234|
|2018|                MOBILE|         13291|
|2021|                MOBILE|         19947|
|2016|                ONLINE|         20326|
|2018|                ONLINE|         29793|
|2021|                ONLINE|         42945|
|2016|                 OTHER|           927|
|2018|                 OTHER|          1425|
|2021|                 OTHER|            32|
|2016|                 PHONE|         55360|
|2018|                 PHONE|         68470|
|2021|                 PHONE|         46535|
+----+----------------------+--------------+



                                                                                

## 10 streets with most incidents between 4 PM and 8 PM

In [49]:
_, name = get_incidents_by_street(df_services, [16, 22])
view_df = spark.sql(f'''
    SELECT *
    FROM {name}
    ORDER BY ComplaintCount DESC
    LIMIT 10
    ''')
view_df.show()



+---------+------------------+--------------+
|  borough|       street_name|ComplaintCount|
+---------+------------------+--------------+
|    BRONX|   GRAND CONCOURSE|           879|
|MANHATTAN|          BROADWAY|           827|
|MANHATTAN|  AMSTERDAM AVENUE|           529|
|    BRONX|  EAST  230 STREET|           475|
| BROOKLYN|      OCEAN AVENUE|           416|
|    BRONX|  EAST  231 STREET|           405|
|    BRONX|     MORRIS AVENUE|           404|
|MANHATTAN|ST NICHOLAS AVENUE|           380|
|MANHATTAN|          2 AVENUE|           320|
|   QUEENS| PARSONS BOULEVARD|           313|
+---------+------------------+--------------+



                                                                                

## Boroughs with most incidents in the morning (7-12)

In [50]:
_, name = get_incidents_by_street(df_services, [7, 11])
view_df = spark.sql(f'''
    SELECT borough, sum(ComplaintCount) as NumberOfComplaints
    FROM {name}
    GROUP BY borough
    ORDER BY NumberOfComplaints DESC
    LIMIT 10
    ''')
view_df.show()



+-------------+------------------+
|      borough|NumberOfComplaints|
+-------------+------------------+
|     BROOKLYN|             25849|
|       QUEENS|             17430|
|        BRONX|             15161|
|    MANHATTAN|             14712|
|STATEN ISLAND|              4240|
|  Unspecified|                90|
+-------------+------------------+



                                                                                

## Boroughs with most incidents in the evening (after 18)

In [51]:
_, name = get_incidents_by_street(df_services, [18, 25])
view_df = spark.sql(f'''
    SELECT borough, sum(ComplaintCount) as NumberOfComplaints
    FROM {name}
    GROUP BY borough
    ORDER BY NumberOfComplaints DESC
    LIMIT 10
    ''')
view_df.show()



+-------------+------------------+
|      borough|NumberOfComplaints|
+-------------+------------------+
|     BROOKLYN|             25053|
|        BRONX|             18358|
|       QUEENS|             18090|
|    MANHATTAN|             17484|
|STATEN ISLAND|              3011|
|  Unspecified|               133|
+-------------+------------------+



                                                                                

## Boroughs with summary of houses prices, number of complaints and most common type of complaint

In [63]:
df1, name1 = sales_measures(df_sales)
df2, name2 = get_incidents_by_borough(df_services)
df3, name3 = get_popular_incidents(df_services)
result = spark.sql(f'''
    SELECT s.borough, SalesNumber, MinPrice, ROUND(AvgPrice, 0) as AvgPrice, MaxPrice, i.ComplaintCount, p.complaint_type AS MostPopularComplaint
    FROM {name1} s INNER JOIN {name2} i ON s.borough = i.borough
    INNER JOIN {name3} p ON s.borough = p.borough
    ORDER BY AvgPrice
    ''')
result.show()



+-------------+-----------+--------+---------+---------+--------------+--------------------+
|      borough|SalesNumber|MinPrice| AvgPrice| MaxPrice|ComplaintCount|MostPopularComplaint|
+-------------+-----------+--------+---------+---------+--------------+--------------------+
|        BRONX|       8490|  200000| 628191.0|8000000.0|         70158|      HEAT/HOT WATER|
|STATEN ISLAND|      15360|  200000| 645265.0|    1.7E7|         16073|Request Large Bul...|
|       QUEENS|      29876|  200000| 835233.0|7750000.0|         79066|     Illegal Parking|
|     BROOKLYN|      20413|  200000|1621919.0|   2.25E7|        105544|      HEAT/HOT WATER|
|    MANHATTAN|        699|  250000|5913766.0|    5.9E7|         68792|      HEAT/HOT WATER|
+-------------+-----------+--------+---------+---------+--------------+--------------------+



                                                                                