##  Aggregations


### Step 1: Initialize PySpark Session


In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, approx_count_distinct,first, last, min, max,sum,sumDistinct,avg,col,desc,asc,struct
from pyspark.sql.window import Window
# Create a Spark session

spark = SparkSession.builder.appName("day4").getOrCreate()


23/09/03 18:50:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Step 2: Load the Dataset


In [34]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "US_Crime_Rates_1960_2014.csv"  # Replace with the actual path
US_Crime_Rates_1960_2014_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = "titanic.csv"  # Replace with the actual path
titanic_df = spark.read.csv(data_path, header=True, inferSchema=True)


In [5]:
US_Crime_Rates_1960_2014_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Violent: integer (nullable = true)
 |-- Property: integer (nullable = true)
 |-- Murder: integer (nullable = true)
 |-- Forcible_Rape: integer (nullable = true)
 |-- Robbery: integer (nullable = true)
 |-- Aggravated_assault: integer (nullable = true)
 |-- Burglary: integer (nullable = true)
 |-- Larceny_Theft: integer (nullable = true)
 |-- Vehicle_Theft: integer (nullable = true)



In [6]:
US_Crime_Rates_1960_2014_df.show()

+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+
|Year|Population|   Total|Violent|Property|Murder|Forcible_Rape|Robbery|Aggravated_assault|Burglary|Larceny_Theft|Vehicle_Theft|
+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+
|1960| 179323175| 3384200| 288460| 3095700|  9110|        17190| 107840|            154320|  912100|      1855400|       328200|
|1961| 182992000| 3488000| 289390| 3198600|  8740|        17220| 106670|            156760|  949600|      1913000|       336000|
|1962| 185771000| 3752200| 301510| 3450700|  8530|        17550| 110860|            164570|  994300|      2089600|       366800|
|1963| 188483000| 4109500| 316970| 3792500|  8640|        17650| 116470|            174210| 1086400|      2297800|       408300|
|1964| 191141000| 4564600| 364220| 4200400|  9360|        21420| 130390|            203050| 12132

In [52]:
#create temp table for sql
US_Crime_Rates_1960_2014_df.createOrReplaceTempView('us_crime')

### count

Question: How many records are there in the US_Crime_Rates_1960_2014_df DataFrame?

In [42]:
#pyspark

#counts the number of records in US_Crime_Rates_1960_2014 
count=US_Crime_Rates_1960_2014_df.count()
print("Number of Count:", count)


Number of Count: 55


In [53]:
#sql

count = spark.sql("""
                      select count(*) as number_of_count
                      from us_crime
                     """)

# count_sql.show()
print("Number of Count:", count.collect()[0][0])


Number of Count: 55


### countDistinct
Question: How many distinct years are present in the US_Crime_Rates_1960_2014_df DataFrame?
Answer:

In [8]:
#pyspark

#counts distinct years
distinct_count=US_Crime_Rates_1960_2014_df.select(countDistinct("Year"))
print("Number of distinct years:",distinct_count.collect()[0][0])

Number of distinct years: 55


In [55]:
#sql

distinct_count = spark.sql("""
                        select count(distinct 'year') as distinct_count_of_years
                       from us_crime
                         """)   
# distinct_count.show()
print("Number of distinct years:",distinct_count.collect()[0][0])

#the output is wrong


Number of distinct years: 1


### approx_count_distinct

Question: Estimate the approximate number of distinct values in the "Total" column of the US_Crime_Rates_1960_2014_df DataFrame.

In [9]:
#pyspark

#the approximate number of distinct values in the "Total" column 
US_Crime_Rates_1960_2014_df.select(approx_count_distinct("Total", 0.1)).show()

+----------------------------+
|approx_count_distinct(Total)|
+----------------------------+
|                          51|
+----------------------------+



In [56]:
#sql
#the approximate number of distinct values in the "Total" column 

#returns a df
total_count = spark.sql(""" 
                        select approx_count_distinct(Total,0.1)
                        from us_crime

                        """) 
total_count.show()



+----------------------------+
|approx_count_distinct(Total)|
+----------------------------+
|                          51|
+----------------------------+



###  first and last

Question: Find the first and last year in the US_Crime_Rates_1960_2014_df DataFrame.

In [57]:
#pyspark

#got first and last year from us_crime_rates_1960_2014 using first and last functions
first_year_df=US_Crime_Rates_1960_2014_df.select(first("Year"))
last_year_df=US_Crime_Rates_1960_2014_df.select(last("Year"))

print("First year:",first_year_df.collect()[0][0])
print("Last year:",last_year_df.collect()[0][0])

First year: 1960
Last year: 2014


In [60]:
#sql

#got first and last year from us_crime_rates_1960_2014 using first and last functions

first_last = spark.sql("""
                        select first(Year), last(Year)
                       from us_crime
                        """)
# first_last.show()


#print result
print("First year:",first_last.collect()[0][0])
print("Last year:",first_last.collect()[0][1])

First year: 1960
Last year: 2014


### min and max

Question: Find the minimum and maximum population values in the US_Crime_Rates_1960_2014_df DataFrame.

In [11]:
#pyspark

#got min and max population from us_crime_rates_1960_2014 using first and last functions

min_df=US_Crime_Rates_1960_2014_df.select(min("population"))
max_df=US_Crime_Rates_1960_2014_df.select(max("population"))
print("Minimum population::",min_df.collect()[0][0])
print("Maximum population: ",max_df.collect()[0][0])


Minimum population:: 179323175
Maximum population:  318857056


In [62]:
#sql

min_max = spark.sql("""
                     select min(population), max(population)
                    from us_crime            
                     """)
# max_min.show()
print("Minimum population::",min_max.collect()[0][0])
print("Maximum population: ",min_max.collect()[0][1])

Minimum population:: 179323175
Maximum population:  318857056


### sumDistinct

Question: Calculate the sum of distinct "Property" values for each year in the US_Crime_Rates_1960_2014_df DataFrame.

In [16]:
#pyspark

#used sumDistinct function to calculated sum of distinct propery grouped by year
US_Crime_Rates_1960_2014_df.groupBy("Year").agg(sumDistinct(col("Property")).alias("SumDistinctProperty")).orderBy(col('year')).show()




+----+-------------------+
|Year|SumDistinctProperty|
+----+-------------------+
|1960|            3095700|
|1961|            3198600|
|1962|            3450700|
|1963|            3792500|
|1964|            4200400|
|1965|            4352000|
|1966|            4793300|
|1967|            5403500|
|1968|            6125200|
|1969|            6749000|
|1970|            7359200|
|1971|            7771700|
|1972|            7413900|
|1973|            7842200|
|1974|            9278700|
|1975|           10252700|
|1976|           10345500|
|1977|            9955000|
|1978|           10123400|
|1979|           11041500|
+----+-------------------+
only showing top 20 rows



In [64]:
#sql

sum_distinct_property= spark.sql("""
                                    select Year, sum(Property)
                                    from us_crime
                                    group by Year
                                    order by Year
                                         """)
sum_distinct_property.show()


+----+-------------+
|Year|sum(Property)|
+----+-------------+
|1960|      3095700|
|1961|      3198600|
|1962|      3450700|
|1963|      3792500|
|1964|      4200400|
|1965|      4352000|
|1966|      4793300|
|1967|      5403500|
|1968|      6125200|
|1969|      6749000|
|1970|      7359200|
|1971|      7771700|
|1972|      7413900|
|1973|      7842200|
|1974|      9278700|
|1975|     10252700|
|1976|     10345500|
|1977|      9955000|
|1978|     10123400|
|1979|     11041500|
+----+-------------+
only showing top 20 rows



### avg

Question: Calculate the average "Murder" rate for the entire dataset in the US_Crime_Rates_1960_2014_df DataFrame.
Answer:

In [13]:
#pyspark

#calculate avg murder rate
Avg_murder=US_Crime_Rates_1960_2014_df.select(avg("Murder"))
print("Average murder rate:",Avg_murder.collect()[0][0])

Average murder rate: 17317.236363636363


In [66]:
#sql

Avg_murder = spark.sql("""
                            select avg(murder) as average
                           from us_crime
                            """)
# Avg_murder.show()

print("Average murder rate:",Avg_murder.collect()[0][0])

Average murder rate: 17317.236363636363


### Aggregating to Complex Types

Question: Calculate the total sum of "Violent" and "Property" crimes for each year in the US_Crime_Rates_1960_2014_df DataFrame. Store the results in a struct type column.

In [21]:
#pyspark

# Calculate the total sum of "Violent" and "Property" crimes for each year
#Created struct for two columns and named it CrimeTotals
result = US_Crime_Rates_1960_2014_df.groupBy("Year").agg(
    struct(
        sum(col("Violent")).alias("TotalViolent"),
        sum(col("Property")).alias("TotalProperty")
    ).alias("CrimeTotals")
).orderBy("Year")


result.show()




+----+-------------------+
|Year|        CrimeTotals|
+----+-------------------+
|1960|  {288460, 3095700}|
|1961|  {289390, 3198600}|
|1962|  {301510, 3450700}|
|1963|  {316970, 3792500}|
|1964|  {364220, 4200400}|
|1965|  {387390, 4352000}|
|1966|  {430180, 4793300}|
|1967|  {499930, 5403500}|
|1968|  {595010, 6125200}|
|1969|  {661870, 6749000}|
|1970|  {738820, 7359200}|
|1971|  {816500, 7771700}|
|1972|  {834900, 7413900}|
|1973|  {875910, 7842200}|
|1974|  {974720, 9278700}|
|1975|{1039710, 10252700}|
|1976|{1004210, 10345500}|
|1977| {1029580, 9955000}|
|1978|{1085550, 10123400}|
|1979|{1208030, 11041500}|
+----+-------------------+
only showing top 20 rows



In [67]:
#sql

result = spark.sql("""
    select Year,
           STRUCT(SUM(Violent), SUM(Property)) AS TotalCrimes
    from us_crime
    group by  Year
    order by Year
""")
                               
result.show()

+----+-------------------+
|Year|        TotalCrimes|
+----+-------------------+
|1960|  {288460, 3095700}|
|1961|  {289390, 3198600}|
|1962|  {301510, 3450700}|
|1963|  {316970, 3792500}|
|1964|  {364220, 4200400}|
|1965|  {387390, 4352000}|
|1966|  {430180, 4793300}|
|1967|  {499930, 5403500}|
|1968|  {595010, 6125200}|
|1969|  {661870, 6749000}|
|1970|  {738820, 7359200}|
|1971|  {816500, 7771700}|
|1972|  {834900, 7413900}|
|1973|  {875910, 7842200}|
|1974|  {974720, 9278700}|
|1975|{1039710, 10252700}|
|1976|{1004210, 10345500}|
|1977| {1029580, 9955000}|
|1978|{1085550, 10123400}|
|1979|{1208030, 11041500}|
+----+-------------------+
only showing top 20 rows



### Grouping

Question: In the given US_Crime_Rates_1960_2014_df DataFrame, you are tasked with finding the average of all crimes combined for each year. Calculate the sum of all crime categories (Violent, Property, Murder, Forcible_Rape, Robbery, Aggravated_assault, Burglary, Larceny_Theft, Vehicle_Theft) for each year and then determine the average of these combined crime sums. Provide the result as the average of all crimes across the entire dataset.

In [25]:
# columns=US_Crime_Rates_1960_2014_df.columns
# new_df=US_Crime_Rates_1960_2014_df.withColumn("rowsum",sum(col(column) for column in columns[1:]))
# new_df.show()

#pyspark

#sum of different columns in each row
total_crime_sum = US_Crime_Rates_1960_2014_df.withColumn("TotalCrimeSum",
                                                    col('violent') + col('property') + col('murder') + col('forcible_rape') + 
                                                    col('robbery') + col('Aggravated_assault') + col('Burglary')+ col('Larceny_Theft')
                                                    + col('Vehicle_Theft'))

#avaerage of calculated sum
avegare_crime = total_crime_sum.agg(avg('TotalCrimeSum'))
print("Average of all crime:", avegare_crime.collect()[0][0])
total_crime_sum.select('year','TotalCrimeSum').show()


Average of all crime: 21201546.145454545
+----+-------------+
|year|TotalCrimeSum|
+----+-------------+
|1960|      6768320|
|1961|      6975980|
|1962|      7504420|
|1963|      8218940|
|1964|      9129240|
|1965|      9478780|
|1966|     10446960|
|1967|     11806860|
|1968|     13440420|
|1969|     14821740|
|1970|     16196040|
|1971|     17176400|
|1972|     16497600|
|1973|     17436220|
|1974|     20506940|
|1975|     22584730|
|1976|     22699410|
|1977|     21969060|
|1978|     22417910|
|1979|     24499060|
+----+-------------+
only showing top 20 rows



In [72]:
#sql

total_crime_sum = spark.sql(""" 
                      SELECT *,
                            (Violent + Property + Murder + Forcible_rape + Robbery +
                            Aggravated_assault + Burglary + Larceny_Theft + Vehicle_Theft) AS TotalCrimeSum
                     FROM us_crime
                    """)

# total_crime_sum.show()

average_crime = total_crime_sum.agg(avg('TotalCrimeSum'))
print("Average of all crime:", avegare_crime.collect()[0][0])
total_crime_sum.select('year','TotalCrimeSum').show()


Average of all crime: 21201546.145454545
+----+-------------+
|year|TotalCrimeSum|
+----+-------------+
|1960|      6768320|
|1961|      6975980|
|1962|      7504420|
|1963|      8218940|
|1964|      9129240|
|1965|      9478780|
|1966|     10446960|
|1967|     11806860|
|1968|     13440420|
|1969|     14821740|
|1970|     16196040|
|1971|     17176400|
|1972|     16497600|
|1973|     17436220|
|1974|     20506940|
|1975|     22584730|
|1976|     22699410|
|1977|     21969060|
|1978|     22417910|
|1979|     24499060|
+----+-------------+
only showing top 20 rows



### Window Functions

Question: Calculate the cumulative sum of "Property" values over the years using a window function in the US_Crime_Rates_1960_2014_df DataFrame.

In [73]:
#PYSPARK

# Define a window specification
window_spec = Window.partitionBy("Year").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate the cumulative sum of "Property" values over the years
US_Crime_Rates_1960_2014_df_window = US_Crime_Rates_1960_2014_df.withColumn(
    "CumulativePropertySum", sum(col("Property")).over(window_spec)
)

US_Crime_Rates_1960_2014_df_window.show()


+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|Year|Population|   Total|Violent|Property|Murder|Forcible_Rape|Robbery|Aggravated_assault|Burglary|Larceny_Theft|Vehicle_Theft|CumulativePropertySum|
+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|1960| 179323175| 3384200| 288460| 3095700|  9110|        17190| 107840|            154320|  912100|      1855400|       328200|              3095700|
|1961| 182992000| 3488000| 289390| 3198600|  8740|        17220| 106670|            156760|  949600|      1913000|       336000|              3198600|
|1962| 185771000| 3752200| 301510| 3450700|  8530|        17550| 110860|            164570|  994300|      2089600|       366800|              3450700|
|1963| 188483000| 4109500| 316970| 3792500|  8640|        17650| 116470|            174210| 10

In [74]:
#sql

sql = spark.sql("""
                    select *,
                    sum(property) OVER (order by Year ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS CumulativePropertySum
                    from us_crime
                        """)

sql.show()


+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|Year|Population|   Total|Violent|Property|Murder|Forcible_Rape|Robbery|Aggravated_assault|Burglary|Larceny_Theft|Vehicle_Theft|CumulativePropertySum|
+----+----------+--------+-------+--------+------+-------------+-------+------------------+--------+-------------+-------------+---------------------+
|1960| 179323175| 3384200| 288460| 3095700|  9110|        17190| 107840|            154320|  912100|      1855400|       328200|              3095700|
|1961| 182992000| 3488000| 289390| 3198600|  8740|        17220| 106670|            156760|  949600|      1913000|       336000|              6294300|
|1962| 185771000| 3752200| 301510| 3450700|  8530|        17550| 110860|            164570|  994300|      2089600|       366800|              9745000|
|1963| 188483000| 4109500| 316970| 3792500|  8640|        17650| 116470|            174210| 10

23/09/03 19:21:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 19:21:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 19:21:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 19:21:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/03 19:21:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### Pivot
Question: You are working with a DataFrame named US_Crime_Rates_1960_2014_df that contains crime data for different crime types over the years. 

In [None]:
#pyspark

#piovot df created to show value of robbery in each year
pivoted = US_Crime_Rates_1960_2014_df.groupBy("Year").pivot("Year").sum("robbery").sort(asc('Year'))
pivoted.show()

+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|Year|  1960|  1961|  1962|  1963|  1964|  1965|  1966|  1967|  1968|  1969|  1970|  1971|  1972|  1973|  1974|  1975|  1976|  1977|  1978|  1979|1980|1981|1982|1983|1984|1985|1986|1987|1988|1989|1990|1991|1992|1993|1994|1995|1996|1997|1998|1999|2000|2001|2002|2003|2004|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|1960|107840|  null|  null|  null|

In [75]:
#spark

# sql_query = """
#     select Year,
#            SUM(CASE WHEN Year = 1960 THEN robbery ELSE NULL END) AS `1960`,
#            SUM(CASE WHEN Year = 1961 THEN robbery ELSE NULL END) AS `1961`,
#            SUM(CASE WHEN Year = 1962 THEN robbery ELSE NULL END) AS `1962`,
#            --we have to write this for all years--
#            SUM(CASE WHEN Year = 2014 THEN robbery ELSE NULL END) AS `2014`
#     from us_crime
#     group BY Year
#     order BY Year ASC
# """

# pivoted = spark.sql("""
#     SELECT *
#     FROM (
#         SELECT Year, robbery
#         FROM crime_rates
#     )
#     PIVOT (
#         SUM(robbery)
#         FOR Year IN (1960, 1961, 1962, ... , 2014)
#     )
#     ORDER BY Year ASC
# """)

#THE QUERY BECOMES VERY LONG