In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pathlib import Path


In [2]:
sc = pyspark.SparkContext(appName="Homework")
sq = SQLContext(sc)

sc

21/08/09 08:07:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# sc.stop() 

#### `Load csv files to spark, using option header=True`

In [4]:
def read_dataframes(path = 'homework'):
    path = Path(path) 
    
    offense_codes_df = sq.read.option("header", "true")\
                         .csv(str(path / "offense_codes.csv"))
    
    crime_df = sq.read.option("header", "true")\
                 .csv(str(path / "crime.csv"))
    
    return offense_codes_df, crime_df

offense_codes_df, crime_df = read_dataframes()

offense_codes_df = offense_codes_df.withColumn("CODE", offense_codes_df["CODE"].cast(IntegerType()))
crime_df = crime_df.withColumn("OFFENSE_CODE", crime_df["OFFENSE_CODE"].cast(IntegerType()))

                                                                                

In [5]:
print("Before deleting duplicates {0} rows left".format(offense_codes_df.count()))

Before deleting duplicates 576 rows left


In [6]:
of_pd = offense_codes_df.toPandas()
crime_pd = crime_df.toPandas()

                                                                                

In [7]:
of_pd.head(2) 

Unnamed: 0,CODE,NAME
0,612,LARCENY PURSE SNATCH - NO FORCE
1,613,LARCENY SHOPLIFTING


In [8]:
crime_pd.head(2) 

Unnamed: 0,INCIDENT_NUMBER,OFFENSE_CODE,OFFENSE_CODE_GROUP,OFFENSE_DESCRIPTION,DISTRICT,REPORTING_AREA,SHOOTING,OCCURRED_ON_DATE,YEAR,MONTH,DAY_OF_WEEK,HOUR,UCR_PART,STREET,Lat,Long,Location
0,I182070945,619,Larceny,LARCENY ALL OTHERS,D14,808,,2018-09-02 13:00:00,2018,9,Sunday,13,Part One,LINCOLN ST,42.35779134,-71.13937053,"(42.35779134, -71.13937053)"
1,I182070943,1402,Vandalism,VANDALISM,C11,347,,2018-08-21 00:00:00,2018,8,Tuesday,0,Part Two,HECLA ST,42.30682138,-71.06030035,"(42.30682138, -71.06030035)"


###
`Deleting duplicates, getting first value before '-' literal from dataframe using spark sql`

In [10]:
offense_codes_df.createOrReplaceTempView("offense_codes_df")

offense_codes_df = sq.sql("SELECT CODE, MAX(NAME) as NAME FROM OFFENSE_CODES_DF GROUP BY CODE")

offense_codes_df = offense_codes_df.withColumn("NAME", split("NAME", '-')[0])
offense_codes_df.show()

print("After deleting duplicates {0} rows left".format(offense_codes_df.count()))

+----+--------------------+
|CODE|                NAME|
+----+--------------------+
| 243|               RAPE |
| 540|           BURGLARY |
| 623|LARCENY SHOPLIFTI...|
|1721|FAILURE TO REGIST...|
|1903|           GAMBLING |
|3704|       M/V ACCIDENT |
| 251|               RAPE |
|2622|KIDNAPPING/CUSTOD...|
| 804|            STALKING|
|2914|                VAL |
| 322|            ROBBERY |
| 321|            ROBBERY |
| 362|    ROBBERY ATTEMPT |
| 613|LARCENY SHOPLIFTI...|
| 633|LARCENY SHOPLIFTI...|
| 375|    ROBERRY ATTEMPT |
|1863|              DRUGS |
|2606|PRISONER ATTEMPT ...|
| 211|               RAPE |
| 530|             B&E NON|
+----+--------------------+
only showing top 20 rows





After deleting duplicates 425 rows left


                                                                                

### Считаем:
`crimes_total` - общее количество преступлений в этом районе,

`crimes_monthly` - медиана числа преступлений в месяц в этом районе

In [11]:
crime_df_distinct = crime_df.dropDuplicates(["DISTRICT", "INCIDENT_NUMBER","YEAR", "MONTH", "LAT", "LONG"])\
                            .select(["DISTRICT", "INCIDENT_NUMBER","YEAR", "MONTH", "LAT", "LONG"])

crime_df_distinct = crime_df_distinct.withColumn("MONTH", lpad("MONTH",2, '0'))
crime_df_distinct = crime_df_distinct.withColumn("DAY_F_DATE", lit("01"))
crime_df_distinct = crime_df_distinct.withColumn("MONTH_DATE", \
                                                 expr("make_date(YEAR, MONTH, DAY_F_DATE)"))

crimes_total_df = crime_df_distinct.groupBy("DISTRICT")\
                                    .agg(countDistinct("INCIDENT_NUMBER").alias("crimes_total")\
                                        ,avg("LAT").alias("lat")\
                                        ,avg("LONG").alias("long")\
                                        )
crimes_total_df.show()




# crime_df_distinct.groupBy("DISTRICT").agg(countDistinct("INCIDENT_NUMBER").alias("crimes_total")).show()



+--------+------------+------------------+-------------------+
|DISTRICT|crimes_total|               lat|               long|
+--------+------------+------------------+-------------------+
|      C6|       21196|42.214906864098275| -70.85942538777128|
|    null|        1583|25.052216289444456|-43.146343829444454|
|      B2|       43403| 42.31632986119104| -71.07629096717032|
|     C11|       37298| 42.29400030603795| -71.05347358106053|
|     E13|       15652| 42.30901127492181| -71.09714168641028|
|      B3|       31131| 42.28245324804027|  -71.0781482850731|
|      E5|       11876| 42.19984348527637| -71.00724221225452|
|     A15|        5978| 42.18506794525546| -70.75409373619097|
|      A7|       12306| 42.36280267789224| -71.00707817700783|
|     D14|       18573|42.342901024460545|  -71.1304138899989|
|      D4|       36755|42.341148104205345| -71.07690743408905|
|     E18|       15746| 42.26249531265002|  -71.1189841463501|
|      A1|       31020| 42.33064855453016| -71.01857752

                                                                                

In [12]:
crimes_monthly_df = crime_df_distinct.groupBy("DISTRICT","MONTH_DATE")\
                                     .agg(countDistinct("INCIDENT_NUMBER")\
                                     .alias("cnt_inc"))

crimes_monthly_df = crimes_monthly_df.groupBy("DISTRICT")\
                                     .agg(expr("percentile(cnt_inc, 0.5)")\
                                     .alias("crimes_monthly"))

crimes_monthly_df.show()

                                                                                

+--------+--------------+
|DISTRICT|crimes_monthly|
+--------+--------------+
|      C6|         543.0|
|    null|          37.5|
|      B2|        1131.5|
|     C11|         981.0|
|     E13|         397.5|
|      B3|         800.0|
|      E5|         304.0|
|     A15|         150.5|
|      A7|         316.5|
|     D14|         474.5|
|      D4|         970.5|
|     E18|         399.5|
|      A1|         774.5|
+--------+--------------+



                                                                                

### Подготовка freq_crimes_df  - 3 наиболее распространненых типа преступления в каждом районе

In [13]:
freq_crimes_df = crime_df.dropDuplicates(["DISTRICT", "INCIDENT_NUMBER","OFFENSE_CODE"])\
                         .select(["DISTRICT", "INCIDENT_NUMBER", "OFFENSE_CODE"])

freq_crimes_df = freq_crimes_df.join(offense_codes_df, \
                                     freq_crimes_df["offense_code"] == offense_codes_df["code"], \
                                     'left')

freq_crimes_df = freq_crimes_df.groupBy("DISTRICT", "NAME")\
                               .agg(count("INCIDENT_NUMBER")\
                               .alias("CNT_BY_NAME"))

window_spec = Window.partitionBy("DISTRICT").orderBy(col("CNT_BY_NAME").desc())

freq_crimes_df = freq_crimes_df.withColumn("dense_rank",dense_rank().over(window_spec))

freq_crimes_df = freq_crimes_df.filter("dense_rank <= 3")

freq_crimes_df = freq_crimes_df.groupby("DISTRICT").agg(collect_list('NAME').alias('frequent_crime_types'))


In [14]:
freq_crimes_df.show()



+--------+--------------------+
|DISTRICT|frequent_crime_types|
+--------+--------------------+
|      C6|[DRUGS , SICK/INJ...|
|    null|[M/V ACCIDENT , M...|
|      B2|[M/V , M/V ACCIDE...|
|     C11|[M/V , SICK/INJUR...|
|     E13|[SICK/INJURED/MED...|
|      B3|[VERBAL DISPUTE, ...|
|      E5|[SICK/INJURED/MED...|
|     A15|[M/V ACCIDENT , I...|
|      A7|[SICK/INJURED/MED...|
|     D14|[TOWED MOTOR VEHI...|
|      D4|[LARCENY SHOPLIFT...|
|     E18|[SICK/INJURED/MED...|
|      A1|[PROPERTY , ASSAU...|
+--------+--------------------+



                                                                                

### Сборка финального датасета в один датафрейм

In [15]:
crimes_total_df1 = crimes_total_df.join(crimes_monthly_df\
                                       ,["DISTRICT"]\
                                       ,'inner')

crimes_total_df1 = crimes_total_df1.join(freq_crimes_df\
                                       ,["DISTRICT"]\
                                       ,'inner')


In [16]:
crimes_total_df1.show()



+--------+------------+------------------+------------------+--------------+--------------------+
|DISTRICT|crimes_total|               lat|              long|crimes_monthly|frequent_crime_types|
+--------+------------+------------------+------------------+--------------+--------------------+
|      C6|       21196|42.214906864098275|-70.85942538777128|         543.0|[DRUGS , SICK/INJ...|
|      B2|       43403| 42.31632986119104|-71.07629096717032|        1131.5|[M/V , M/V ACCIDE...|
|     C11|       37298| 42.29400030603795|-71.05347358106053|         981.0|[M/V , SICK/INJUR...|
|     E13|       15652| 42.30901127492181|-71.09714168641028|         397.5|[SICK/INJURED/MED...|
|      B3|       31131| 42.28245324804027| -71.0781482850731|         800.0|[VERBAL DISPUTE, ...|
|      E5|       11876| 42.19984348527637|-71.00724221225452|         304.0|[SICK/INJURED/MED...|
|     A15|        5978| 42.18506794525546|-70.75409373619097|         150.5|[M/V ACCIDENT , I...|
|      A7|       123

                                                                                

In [17]:
def write_result_parquet(df = crimes_total_df1, path = 'homework/result'):
    path = Path(path) 
    offense_codes_df = df.repartition(1).write.mode('overwrite').parquet(str(path / "parquet"))
    
write_result_parquet()

                                                                                

### Проверяем записанный паркет

In [18]:
sq.read.parquet("homework/result/parquet").show()

+--------+------------+------------------+------------------+--------------+--------------------+
|DISTRICT|crimes_total|               lat|              long|crimes_monthly|frequent_crime_types|
+--------+------------+------------------+------------------+--------------+--------------------+
|      C6|       21196|42.214906864098275|-70.85942538777128|         543.0|[DRUGS , SICK/INJ...|
|      B2|       43403| 42.31632986119104|-71.07629096717032|        1131.5|[M/V , M/V ACCIDE...|
|     C11|       37298| 42.29400030603795|-71.05347358106053|         981.0|[M/V , SICK/INJUR...|
|     E13|       15652| 42.30901127492181|-71.09714168641028|         397.5|[SICK/INJURED/MED...|
|      B3|       31131| 42.28245324804027| -71.0781482850731|         800.0|[VERBAL DISPUTE, ...|
|      E5|       11876| 42.19984348527637|-71.00724221225452|         304.0|[SICK/INJURED/MED...|
|     A15|        5978| 42.18506794525546|-70.75409373619097|         150.5|[M/V ACCIDENT , I...|
|      A7|       123