In [1]:
!pip install pyarrow

Defaulting to user installation because normal site-packages is not writeable


In [2]:
!pip install numpy==1.26.4

Defaulting to user installation because normal site-packages is not writeable


In [3]:
import findspark
findspark.init('/opt/spark')

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType

In [5]:
spark = SparkSession \
    .builder \
    .master('local') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/08 16:35:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
!hdfs dfs -mkdir /tmp/crime

2025-01-08 16:35:29,608 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
!hdfs dfs -put crime.csv /tmp/crime

2025-01-08 16:35:32,957 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
crime = spark \
    .read \
    .option('header', 'true') \
    .csv('/tmp/crime/crime.csv') \
    .withColumn('month', F.lpad('month', 2, '0')) \
    .withColumn('year_month', F.concat_ws('-', F.col('year'), F.col('month')))

crime.show(5)

+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+----------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP| OFFENSE_DESCRIPTION|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|month|DAY_OF_WEEK|HOUR|  UCR_PART|     STREET|        Lat|        Long|            Location|year_month|
+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+----------+
|     I182070945|       00619|             Larceny|  LARCENY ALL OTHERS|     D14|           808|    null|2018-09-02 13:00:00|2018|   09|     Sunday|  13|  Part One| LINCOLN ST|42.35779134|-71.13937053|(42.35779134, -71...|   2018-09|
|     I182070943|       01402|           Vandalism|           VA

# 1

In [9]:
df1 = crime \
    .groupBy('year_month') \
    .agg(F.count('incident_number').alias('incident_sum')) \
    .select('year_month', 'incident_sum')

df1.show(5)

+----------+------------+
|year_month|incident_sum|
+----------+------------+
|   2017-09|        8940|
|   2017-10|        8846|
|   2016-02|        7308|
|   2018-06|        8834|
|   2017-05|        8715|
+----------+------------+
only showing top 5 rows



                                                                                

In [10]:
res1_schema = StructType(
    [
        StructField('year_month', StringType(), True),
        StructField('incident_sum', LongType(), True),
        StructField('cum_sum', LongType(), True),
        StructField('pseudo_group', IntegerType(), True)
    ]
)

In [11]:
@pandas_udf(res1_schema, PandasUDFType.GROUPED_MAP)
def cumsum(pdf):
    pdf = pdf.sort_values(by='year_month')
    pdf['cum_sum'] = np.cumsum(pdf['incident_sum'])
    return pdf

In [12]:
df1 = df1 \
    .withColumn('pseudo_group', F.lit(1))

df1.show(5)

+----------+------------+------------+
|year_month|incident_sum|pseudo_group|
+----------+------------+------------+
|   2017-09|        8940|           1|
|   2017-10|        8846|           1|
|   2016-02|        7308|           1|
|   2018-06|        8834|           1|
|   2017-05|        8715|           1|
+----------+------------+------------+
only showing top 5 rows



In [13]:
res1 = df1 \
    .groupBy('pseudo_group') \
    .apply(cumsum) \
    .drop('pseudo_group')

res1.show(12)



+----------+------------+-------+
|year_month|incident_sum|cum_sum|
+----------+------------+-------+
|   2015-06|        4191|   4191|
|   2015-07|        8324|  12515|
|   2015-08|        8342|  20857|
|   2015-09|        8414|  29271|
|   2015-10|        8308|  37579|
|   2015-11|        7818|  45397|
|   2015-12|        7991|  53388|
|   2016-01|        7835|  61223|
|   2016-02|        7308|  68531|
|   2016-03|        8199|  76730|
|   2016-04|        8101|  84831|
|   2016-05|        8578|  93409|
+----------+------------+-------+
only showing top 12 rows



# 2

In [14]:
df2 = crime \
    .groupBy('year', 'month') \
    .agg(F.count('incident_number').alias('incident_sum')) \
    .select('year', 'month', 'incident_sum')

df2.show(5)

+----+-----+------------+
|year|month|incident_sum|
+----+-----+------------+
|2016|   05|        8578|
|2015|   09|        8414|
|2017|   08|        9206|
|2018|   07|        8538|
|2018|   01|        7782|
+----+-----+------------+
only showing top 5 rows



In [15]:
def cumsum(pdf):
    pdf = pdf.sort_values(by='month')
    pdf['cum_sum'] = np.cumsum(pdf['incident_sum'])
    return pdf

In [16]:
res2 = df2 \
    .groupBy('year') \
    .applyInPandas(cumsum, 'year string, month string, incident_sum long, cum_sum long')

res2.show(12)

+----+-----+------------+-------+
|year|month|incident_sum|cum_sum|
+----+-----+------------+-------+
|2015|   06|        4191|   4191|
|2015|   07|        8324|  12515|
|2015|   08|        8342|  20857|
|2015|   09|        8414|  29271|
|2015|   10|        8308|  37579|
|2015|   11|        7818|  45397|
|2015|   12|        7991|  53388|
|2016|   01|        7835|   7835|
|2016|   02|        7308|  15143|
|2016|   03|        8199|  23342|
|2016|   04|        8101|  31443|
|2016|   05|        8578|  40021|
+----+-----+------------+-------+
only showing top 12 rows



In [17]:
spark.stop()