##  Aggregations


### Step 1: Initialize PySpark Session


In [100]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, approx_count_distinct, first, last, col, min, max, sumDistinct, avg, sum, struct, asc

# Create a Spark session
spark = SparkSession.builder.appName("day3").getOrCreate()


### Step 2: Load the Dataset


In [2]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "./US_Crime_Rates_1960_2014.csv"  # Replace with the actual path
US_Crime_Rates_1960_2014_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = "./titanic.csv"  # Replace with the actual path
titanic_df = spark.read.csv(data_path, header=True, inferSchema=True)


                                                                                

In [3]:
US_Crime_Rates_1960_2014_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Violent: integer (nullable = true)
 |-- Property: integer (nullable = true)
 |-- Murder: integer (nullable = true)
 |-- Forcible_Rape: integer (nullable = true)
 |-- Robbery: integer (nullable = true)
 |-- Aggravated_assault: integer (nullable = true)
 |-- Burglary: integer (nullable = true)
 |-- Larceny_Theft: integer (nullable = true)
 |-- Vehicle_Theft: integer (nullable = true)



In [4]:
US_Crime_Rates_1960_2014_df.show()

+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+
|Year|Population|   Total|Violent|Property|Murder|Forcible_Rape|Robbery|Aggravated_assault|Burglary|Larceny_Theft|Vehicle_Theft|
+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+
|1960| 179323175| 3384200| 288460| 3095700|  9110|        17190| 107840|            154320|  912100|      1855400|       328200|
|1961| 182992000| 3488000| 289390| 3198600|  8740|        17220| 106670|            156760|  949600|      1913000|       336000|
|1962| 185771000| 3752200| 301510| 3450700|  8530|        17550| 110860|            164570|  994300|      2089600|       366800|
|1963| 188483000| 4109500| 316970| 3792500|  8640|        17650| 116470|            174210| 1086400|      2297800|       408300|
|1964| 191141000| 4564600| 364220| 4200400|  9360|        21420| 130390|            203050| 12132

In [7]:
# view for US_Crime_Rates_1960_2014_df
US_Crime_Rates_1960_2014_df.createOrReplaceTempView("crime_data")


In [8]:
# view for titanic
titanic_df.createOrReplaceTempView("titanic_data")

### count

Question: How many records are there in the US_Crime_Rates_1960_2014_df DataFrame?

In [6]:
# pyspark

total_records = US_Crime_Rates_1960_2014_df.count()

print("Total number of records are:", total_records)

Total records are: 55


In [9]:
# sparksql

record_count = spark.sql("SELECT COUNT(*) FROM crime_data").collect()[0][0]
print("Total number of records:", record_count)


Total number of records: 55


### countDistinct
Question: How many distinct years are present in the US_Crime_Rates_1960_2014_df DataFrame?
Answer:

In [13]:
# pyspark

dis_years = US_Crime_Rates_1960_2014_df.select(countDistinct("Year")).collect()[0][0]

print("Distinct years present are:", dis_years)

Distinct years present are: 55


In [14]:
# sparksql

dist_year = spark.sql ("SELECT COUNT(DISTINCT Year) FROM crime_data").collect()[0][0]

print("Total distinct years:", dist_year)

Total distinct years: 55


### approx_count_distinct

Question: Estimate the approximate number of distinct values in the "Total" column of the US_Crime_Rates_1960_2014_df DataFrame.

In [18]:
# pyspark

approx = US_Crime_Rates_1960_2014_df.select(approx_count_distinct("total")).collect()[0][0]

print("Approx distinct values in total column are:", approx)

23/09/03 11:48:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Approx distinct values in total column are: 55


In [19]:
# sparksql

approx_total = spark.sql("SELECT APPROX_COUNT_DISTINCT(Total) FROM crime_data").collect()[0][0]

print("Approx distinct rows in total column are:", approx_total)

Approx distinct rows in total column are: 55


###  first and last

Question: Find the first and last year in the US_Crime_Rates_1960_2014_df DataFrame.

In [45]:
# pyspark

first_yr = US_Crime_Rates_1960_2014_df.select(first("year")).collect()[0][0]
last_yr = US_Crime_Rates_1960_2014_df.select(last("year")).collect()[0][0]

print("First year is:", first_yr)
print("Last year is:", last_yr)

# first_last = US_Crime_Rates_1960_2014_df.select(first('year'), last('year'))
# first_last.show()

First year is: 1960
Last year is: 2014


In [49]:
# first_last = spark.sql("SELECT FIRST(Year), LAST(Year) FROM crime_data").collect()[0][0]

# print("First and Last year:", first_last)

First and Last year: 1960


In [46]:
# sparksql

first_year = spark.sql("SELECT FIRST(Year) FROM crime_data").collect()[0][0]
last_year = spark.sql("SELECT LAST(Year) FROM crime_data").collect()[0][0]

print("First year:", first_year)
print("Last year", last_year)

First year: 1960
Last year 2014


### min and max

Question: Find the minimum and maximum population values in the US_Crime_Rates_1960_2014_df DataFrame.

In [53]:
# pyspark

min_pop = US_Crime_Rates_1960_2014_df.select(min('Population')).first()[0]
max_pop = US_Crime_Rates_1960_2014_df.select(max('Population')).first()[0]

print("Minimum population:", min_pop)
print("Maximum population", max_pop)

Minimum population: 179323175
Maximum population 318857056


In [60]:
# sparksql

min_pop_result = spark.sql("SELECT MIN(Population) FROM crime_data").first()[0]

max_pop_result = spark.sql("SELECT MAX(Population) FROM crime_data").first()[0]


print("Minimum population:", min_pop_result)
print("Maximum population:", max_pop_result)

Minimum population: 179323175
Maximum population: 318857056


### sumDistinct

Question: Calculate the sum of distinct "Property" values for each year in the US_Crime_Rates_1960_2014_df DataFrame.

In [65]:
# pyspark

sum_distinct_property_by_year = US_Crime_Rates_1960_2014_df.groupBy("Year").agg(sumDistinct("Property").alias("SumDistinctProperty")).orderBy(col('year'))

sum_distinct_property_by_year.show()




+----+-------------------+
|Year|SumDistinctProperty|
+----+-------------------+
|1960|            3095700|
|1961|            3198600|
|1962|            3450700|
|1963|            3792500|
|1964|            4200400|
|1965|            4352000|
|1966|            4793300|
|1967|            5403500|
|1968|            6125200|
|1969|            6749000|
|1970|            7359200|
|1971|            7771700|
|1972|            7413900|
|1973|            7842200|
|1974|            9278700|
|1975|           10252700|
|1976|           10345500|
|1977|            9955000|
|1978|           10123400|
|1979|           11041500|
+----+-------------------+
only showing top 20 rows



In [64]:
# sparksql

sum_distinct_property_year = spark.sql("""
    SELECT Year, SUM(DISTINCT Property) as SumDistinctProperty
    FROM crime_data
    GROUP BY Year
    ORDER BY Year
""")
                                          
sum_distinct_property_year.show()

+----+-------------------+
|Year|SumDistinctProperty|
+----+-------------------+
|1960|            3095700|
|1961|            3198600|
|1962|            3450700|
|1963|            3792500|
|1964|            4200400|
|1965|            4352000|
|1966|            4793300|
|1967|            5403500|
|1968|            6125200|
|1969|            6749000|
|1970|            7359200|
|1971|            7771700|
|1972|            7413900|
|1973|            7842200|
|1974|            9278700|
|1975|           10252700|
|1976|           10345500|
|1977|            9955000|
|1978|           10123400|
|1979|           11041500|
+----+-------------------+
only showing top 20 rows



### avg

Question: Calculate the average "Murder" rate for the entire dataset in the US_Crime_Rates_1960_2014_df DataFrame.
Answer:

In [72]:
# pyspark

avg_murder = US_Crime_Rates_1960_2014_df.select(avg('Murder')).first()[0]
print("Average murder rate:", avg_murder)

Average murder rate: 17317.236363636363


In [71]:
# sparksql

avg_mur = spark.sql("SELECT AVG(Murder) FROM crime_data").first()[0]
print("Average murder rate is:", avg_mur)

Average murder rate is: 17317.236363636363


### Aggregating to Complex Types

Question: Calculate the total sum of "Violent" and "Property" crimes for each year in the US_Crime_Rates_1960_2014_df DataFrame. Store the results in a struct type column.

In [77]:
# pyspark

sum_violent_property_crimes_by_year = US_Crime_Rates_1960_2014_df.groupBy('Year') \
    .agg(
        sum("Violent").alias("TotalViolentCrimes"),
        sum("Property").alias("TotalPropertyCrimes")
    ) \
    .withColumn("TotalCrimes", struct("TotalViolentCrimes", "TotalPropertyCrimes")) \
    .orderBy("Year")

sum_violent_property_crimes_by_year.show()

+----+------------------+-------------------+-------------------+
|Year|TotalViolentCrimes|TotalPropertyCrimes|        TotalCrimes|
+----+------------------+-------------------+-------------------+
|1960|            288460|            3095700|  {288460, 3095700}|
|1961|            289390|            3198600|  {289390, 3198600}|
|1962|            301510|            3450700|  {301510, 3450700}|
|1963|            316970|            3792500|  {316970, 3792500}|
|1964|            364220|            4200400|  {364220, 4200400}|
|1965|            387390|            4352000|  {387390, 4352000}|
|1966|            430180|            4793300|  {430180, 4793300}|
|1967|            499930|            5403500|  {499930, 5403500}|
|1968|            595010|            6125200|  {595010, 6125200}|
|1969|            661870|            6749000|  {661870, 6749000}|
|1970|            738820|            7359200|  {738820, 7359200}|
|1971|            816500|            7771700|  {816500, 7771700}|
|1972|    

In [83]:
# sparksql

sum_crimes_by_year = spark.sql("""
    SELECT Year,
           SUM(Violent) AS TotalViolentCrimes,
           SUM(Property) AS TotalPropertyCrimes,
           STRUCT(SUM(Violent), SUM(Property)) AS TotalCrimes
    FROM crime_data
    GROUP BY Year
    ORDER BY Year
""")
                               
sum_crimes_by_year.show()

+----+------------------+-------------------+-------------------+
|Year|TotalViolentCrimes|TotalPropertyCrimes|        TotalCrimes|
+----+------------------+-------------------+-------------------+
|1960|            288460|            3095700|  {288460, 3095700}|
|1961|            289390|            3198600|  {289390, 3198600}|
|1962|            301510|            3450700|  {301510, 3450700}|
|1963|            316970|            3792500|  {316970, 3792500}|
|1964|            364220|            4200400|  {364220, 4200400}|
|1965|            387390|            4352000|  {387390, 4352000}|
|1966|            430180|            4793300|  {430180, 4793300}|
|1967|            499930|            5403500|  {499930, 5403500}|
|1968|            595010|            6125200|  {595010, 6125200}|
|1969|            661870|            6749000|  {661870, 6749000}|
|1970|            738820|            7359200|  {738820, 7359200}|
|1971|            816500|            7771700|  {816500, 7771700}|
|1972|    

### Grouping

Question: In the given US_Crime_Rates_1960_2014_df DataFrame, you are tasked with finding the average of all crimes combined for each year. Calculate the sum of all crime categories (Violent, Property, Murder, Forcible_Rape, Robbery, Aggravated_assault, Burglary, Larceny_Theft, Vehicle_Theft) for each year and then determine the average of these combined crime sums. Provide the result as the average of all crimes across the entire dataset.

In [84]:
# pyspark

total_sum = US_Crime_Rates_1960_2014_df.withColumn("Total_crime_sum",
                                                    col('violent') + col('property') + col('murder') + col('forcible_rape') + 
                                                    col('robbery') + col('Aggravated_assault') + col('Burglary')+ col('Larceny_Theft')
                                                    + col('Vehicle_Theft')).alias('Total_crime_sum')

# total_sum.show()


avegare_crime = total_sum.agg(avg('total_crime_sum'))
print(f"Average of all crime: {avegare_crime.collect()[0][0]}")

total_sum.select('year','total_crime_sum').show()


Average of all crime: 21201546.145454545
+----+---------------+
|year|total_crime_sum|
+----+---------------+
|1960|        6768320|
|1961|        6975980|
|1962|        7504420|
|1963|        8218940|
|1964|        9129240|
|1965|        9478780|
|1966|       10446960|
|1967|       11806860|
|1968|       13440420|
|1969|       14821740|
|1970|       16196040|
|1971|       17176400|
|1972|       16497600|
|1973|       17436220|
|1974|       20506940|
|1975|       22584730|
|1976|       22699410|
|1977|       21969060|
|1978|       22417910|
|1979|       24499060|
+----+---------------+
only showing top 20 rows



In [95]:
# sparksql

total_sum_crime = spark.sql( """
    SELECT Year,
           (Violent + Property + Murder + Forcible_Rape +
            Robbery + Aggravated_assault + Burglary + Larceny_Theft +
            Vehicle_Theft) AS TotalCrimeSum
    FROM crime_data
""")



# Calculate the average of all crimes combined
average_combined_crime = total_sum_crime.agg({"TotalCrimeSum": "avg"}).collect()[0][0]

# Show the result
print("Average combined crime is:", average_combined_crime)
      
# Show all crimes for every year
total_sum_crime.show()


Average combined crime is: 21201546.145454545
+----+-------------+
|Year|TotalCrimeSum|
+----+-------------+
|1960|      6768320|
|1961|      6975980|
|1962|      7504420|
|1963|      8218940|
|1964|      9129240|
|1965|      9478780|
|1966|     10446960|
|1967|     11806860|
|1968|     13440420|
|1969|     14821740|
|1970|     16196040|
|1971|     17176400|
|1972|     16497600|
|1973|     17436220|
|1974|     20506940|
|1975|     22584730|
|1976|     22699410|
|1977|     21969060|
|1978|     22417910|
|1979|     24499060|
+----+-------------+
only showing top 20 rows



### Window Functions

Question: Calculate the cumulative sum of "Property" values over the years using a window function in the US_Crime_Rates_1960_2014_df DataFrame.

In [97]:
# pyspark
from pyspark.sql.window import Window

# defining a window
window_spec = Window.orderBy("Year")

# calc cummulative sum of property over years by window func
cumulative_sum_property = US_Crime_Rates_1960_2014_df.withColumn(
    "CumulativeSumProperty",
    sum("Property").over(window_spec)
)

cumulative_sum_property.show()

23/09/03 15:43:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:43:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:43:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:43:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:43:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|Year|Population|   Total|Violent|Property|Murder|Forcible_Rape|Robbery|Aggravated_assault|Burglary|Larceny_Theft|Vehicle_Theft|CumulativeSumProperty|
+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|1960| 179323175| 3384200| 288460| 3095700|  9110|        17190| 107840|            154320|  912100|      1855400|       328200|              3095700|
|1961| 182992000| 3488000| 289390| 3198600|  8740|        17220| 106670|            156760|  949600|      1913000|       336000|              6294300|
|1962| 185771000| 3752200| 301510| 3450700|  8530|        17550| 110860|            164570|  994300|      2089600|       366800|              9745000|
|1963| 188483000| 4109500| 316970| 3792500|  8640|        17650| 116470|            174210| 10

In [98]:
# sparksql

# Define a SQL query to calculate the cumulative sum of "Property" values over the years
cummulative_sum_property= spark.sql("""
    SELECT Year, Property,
           SUM(Property) OVER (ORDER BY Year) AS CumulativeSumProperty
    FROM crime_data
""")

# Show the result
cumulative_sum_property.show()

23/09/03 15:46:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:46:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:46:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:46:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 15:46:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|Year|Population|   Total|Violent|Property|Murder|Forcible_Rape|Robbery|Aggravated_assault|Burglary|Larceny_Theft|Vehicle_Theft|CumulativeSumProperty|
+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|1960| 179323175| 3384200| 288460| 3095700|  9110|        17190| 107840|            154320|  912100|      1855400|       328200|              3095700|
|1961| 182992000| 3488000| 289390| 3198600|  8740|        17220| 106670|            156760|  949600|      1913000|       336000|              6294300|
|1962| 185771000| 3752200| 301510| 3450700|  8530|        17550| 110860|            164570|  994300|      2089600|       366800|              9745000|
|1963| 188483000| 4109500| 316970| 3792500|  8640|        17650| 116470|            174210| 10

### Pivot
Question: You are working with a DataFrame named US_Crime_Rates_1960_2014_df that contains crime data for different crime types over the years. 

In [101]:
#created a pivot table of year where the value of robbery of the year is shown.
pivot_table_robery = US_Crime_Rates_1960_2014_df.groupBy('year').pivot('year').sum('robbery').sort(asc('year'))
pivot_table_robery.show()

+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|year|  1960|  1961|  1962|  1963|  1964|  1965|  1966|  1967|  1968|  1969|  1970|  1971|  1972|  1973|  1974|  1975|  1976|  1977|  1978|  1979|1980|1981|1982|1983|1984|1985|1986|1987|1988|1989|1990|1991|1992|1993|1994|1995|1996|1997|1998|1999|2000|2001|2002|2003|2004|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|1960|107840|  null|  null|  null|