
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0                                                        |
| %security_configuration     |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |

In [None]:
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.31 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Previous number of workers: 5
Setting new number of workers to: 2
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::687494451982:role/Bootcamp-Glue-Notebook
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: 8b7ff77e-b641-4dce-85e3-009e714587b3
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog tr

In [2]:
#Crime catalog
crimeCatalog = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://big-data-bootcamp-2022/week2/day4/resources/SacramentocrimeJanuary2006.csv"]
    },
    format_options={
        "withHeader": True,
        "separator": ","
    }).toDF()

# Crime clarification
crimeClarification = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://big-data-bootcamp-2022/week2/day4/resources/ucr_ncic_codes.tsv"]
    },
    format_options={
        "withHeader": True,
        "separator": "\t"
    }).toDF()




### Write your solution under each cell using input DataFrames - crimeCatalog and crimeClarification.
### For each task a code snippet needs to be written to achieve the task given. You can also add comments from your side if you need to.

In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("CrimeAnalyse").getOrCreate()

24/01/22 17:53:37 WARN Utils: Your hostname, Mewans-MacBook-Air-M1.local resolves to a loopback address: 127.0.0.1; using 192.168.201.38 instead (on interface en0)
24/01/22 17:53:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/22 17:53:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Import data from SacramentocrimeJanuary2006.csv

In [5]:
crimeCatalog = spark.read.csv("SacramentocrimeJanuary2006.csv", header=True, inferSchema=True)

In [8]:
crimeCatalog.show(5)

+-----------+-------------------+--------+----------+----+--------------------+-------------+-----------+------------+
|  cdatetime|            address|district|      beat|grid|          crimedescr|ucr_ncic_code|   latitude|   longitude|
+-----------+-------------------+--------+----------+----+--------------------+-------------+-----------+------------+
|1/1/06 0:00| 3108 OCCIDENTAL DR|       3|3C        |1115|10851(A)VC TAKE V...|         2404|38.55042047|-121.3914158|
|1/1/06 0:00|2082 EXPEDITION WAY|       5|5A        |1512|459 PC  BURGLARY ...|         2204|38.47350069|-121.4901858|
|1/1/06 0:00|         4 PALEN CT|       2|2A        | 212|10851(A)VC TAKE V...|         2404|38.65784584|-121.4621009|
|1/1/06 0:00|     22 BECKFORD CT|       6|6C        |1443|476 PC PASS FICTI...|         2501|38.50677377|-121.4269508|
|1/1/06 0:00|   3421 AUBURN BLVD|       2|2A        | 508|459 PC  BURGLARY-...|         2299| 38.6374478|-121.3846125|
+-----------+-------------------+--------+------

In [None]:
# Import data from ucr_ncic_codes.tsv

In [9]:
crimeClarification = spark.read.option("delimiter", "\t").csv("ucr_ncic_codes.tsv", header=True, inferSchema=True)

In [10]:
crimeClarification.show(5)

+----------+-------------------+--------------------+------------------+----------+------------------------------------+
|Crime Code|Crime Category Code|NCIC Charge Category|      Crime Charge|Parameters|ICE Criminal Offense Level (Default)|
+----------+-------------------+--------------------+------------------+----------+------------------------------------+
|      0101|                SOV|         Sovereignty|           Treason|      NULL|                             Level 1|
|      0102|                SOV|         Sovereignty|Treason Misprision|      NULL|                             Level 1|
|      0103|                SOV|         Sovereignty|         Espionage|      NULL|                             Level 1|
|      0104|                SOV|         Sovereignty|          Sabotage|      NULL|                             Level 1|
|      0105|                SOV|         Sovereignty|          Sedition|      NULL|                             Level 1|
+----------+-------------------+

In [11]:
# Task 1. Print unique districts sorted by name (contents of a district column).
unique_districts = crimeCatalog.select("district").distinct().orderBy("district")

In [12]:
unique_districts.show(5)

+--------+
|district|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
+--------+
only showing top 5 rows



In [15]:
from pyspark.sql.functions import col

In [16]:
# Task 2. In which district most crimes occurred?
most_crime_occurred_district = crimeCatalog.groupBy("district").count().orderBy(col("count").desc()).first()["district"]
print(f"The district with the most crimes is: {most_crime_occurred_district}")

The district with the most crimes is: 3


In [None]:
# Task 3. What is most dangerous time of day?
#      * Morning 6:00-12:00
#      * Day 12:00-18:00
#      * Evening 18:00-00:00
#      * Night 00:00-6:00
#      * You might want to split cdatetime column into 2 different for this task by something


In [26]:
# Importing necessary functions
from pyspark.sql.functions import hour, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import split

In [27]:
# Split the cdatetime column and determine the most dangerous time of day
# consider apace and split date as a first part and time as a second part
dataSplited = crimeCatalog.withColumn("datetime_split", split(crimeCatalog["cdatetime"], " "))

In [31]:
# use same variable and store data in two columns
dataSplited = dataSplited.withColumn("date", dataSplited["datetime_split"][0])
dataSplited = dataSplited.withColumn("time", dataSplited["datetime_split"][1])

In [34]:
# show dataSplited
dataSplited.show(1)

+-----------+------------------+--------+----------+----+--------------------+-------------+-----------+------------+------+----+-----------+--------------+
|  cdatetime|           address|district|      beat|grid|          crimedescr|ucr_ncic_code|   latitude|   longitude|  date|time|time_of_day|datetime_split|
+-----------+------------------+--------+----------+----+--------------------+-------------+-----------+------------+------+----+-----------+--------------+
|1/1/06 0:00|3108 OCCIDENTAL DR|       3|3C        |1115|10851(A)VC TAKE V...|         2404|38.55042047|-121.3914158|1/1/06|0:00|    Unknown|[1/1/06, 0:00]|
+-----------+------------------+--------+----------+----+--------------------+-------------+-----------+------------+------+----+-----------+--------------+
only showing top 1 row



In [35]:
# Defining time categories
time_categories = [
    (hour("time").between(6, 11), "Morning"),
    (hour("time").between(12, 17), "Day"),
    (hour("time").between(18, 23), "Evening"),
    (hour("time").between(0, 5), "Night")
]

In [36]:
# Using when and otherwise to categorize the time of day
crimeCatalog = dataSplited.withColumn("time_of_day", 
                                       when(time_categories[0][0], time_categories[0][1])
                                       .when(time_categories[1][0], time_categories[1][1])
                                       .when(time_categories[2][0], time_categories[2][1])
                                       .when(time_categories[3][0], time_categories[3][1])
                                       .otherwise("Unknown"))

In [37]:
# Counting the occurrences for each time category
most_dangerous_time = crimeCatalog.groupBy("time_of_day").count().orderBy(col("count").desc()).first()["time_of_day"]
print(f"The most dangerous time of day is: {most_dangerous_time}")

The most dangerous time of day is: Day


In [39]:
from pyspark.sql.functions import count

In [40]:
# Task 4. Count crimes per day.
crimes_per_day = crimeCatalog.groupBy("date").agg(count("date").alias("crimes_count"))
crimes_per_day.show()

+-------+------------+
|   date|crimes_count|
+-------+------------+
|1/14/06|         207|
|1/18/06|         295|
|1/24/06|         277|
|1/17/06|         282|
| 1/3/06|         256|
| 1/5/06|         268|
|1/22/06|         204|
|1/26/06|         254|
|1/31/06|         245|
|1/23/06|         241|
| 1/8/06|         182|
|1/11/06|         311|
|1/30/06|         266|
|1/25/06|         249|
|1/12/06|         265|
| 1/1/06|         250|
|1/21/06|         213|
|1/29/06|         182|
|1/19/06|         266|
|1/27/06|         279|
+-------+------------+
only showing top 20 rows



In [46]:
# Task 5. Count crimes per day per Charge Category.
# NOTE: Crime levels can be found in ucr_ncic_codes.tsv

# Joining crimeCatalog with ucr_ncic_codes to get the Charge Category
# left outer desides here because crimeCatalog act as a main document as assuming crimeClarification dosen't have all the data maching to 
# crimeCatalog, this is using to avoid conflicts
joined_both = crimeCatalog.join(crimeClarification, crimeCatalog["ucr_ncic_code"] == crimeClarification["Crime Code"], "left_outer")

# Grouping by date and Charge Category, then counting the occurrences
crimes_per_day_per_category = joined_both.groupBy("date", "ucr_ncic_code","Crime Charge").agg(count("*").alias("crimes_count"))

crimes_per_day_per_category.show()

+-------+-------------+--------------------+------------+
|   date|ucr_ncic_code|        Crime Charge|crimes_count|
+-------+-------------+--------------------+------------+
| 1/8/06|         2399|             Larceny|           9|
|1/18/06|         1299|             Robbery|          10|
|1/24/06|         3572|Amphetamine - Pos...|           2|
|1/25/06|         2803|Receive Stolen Pr...|           1|
|1/28/06|         5401|         Hit and Run|           7|
| 1/1/06|         2299|            Burglary|          18|
| 1/8/06|         5207|Incendiary Device...|           1|
|1/10/06|         3699|        aSex Offense|           1|
|1/23/06|         2404|       Vehicle Theft|          28|
|1/24/06|         4803| Making False Report|           1|
| 1/6/06|         3562| Marijuana - Possess|           1|
|1/10/06|         2604|Fraud - Impersona...|           2|
|1/10/06|         2501|   Forgery Of Checks|           2|
|1/11/06|         5399|        Public Peace|           1|
|1/17/06|     

In [53]:
# Task 6. What are 5 most occurring Crime Category Codes?
# NOTE: Crime levels can be found in ucr_ncic_codes.tsv
# order into descending and then get latest 5 records, that will be the most occuring crime codes
from pyspark.sql.functions import desc

# Task 6. Find the 5 most occurring Crime Category Codes
# here i have also ignored null values
# i used this - column name "Crime Category Code" contains spaces, so it is enclosed with backticks
most_occurring_crime_codes = joined_both.filter("`Crime Category Code` IS NOT NULL")\
                            .groupBy("Crime Category Code").agg(count("*").alias("crimes_count"))\
                            .orderBy(desc("crimes_count")).limit(5)

most_occurring_crime_codes.show()

+-------------------+------------+
|Crime Category Code|crimes_count|
+-------------------+------------+
|               BURG|         971|
|                 SV|         881|
|               LARC|         706|
|                 DP|         356|
|               ASSL|         325|
+-------------------+------------+



In [23]:
# Task 7. What are 3 drugs most involved in crimes? (names and count)
# NOTE: Crime levels can be found in ucr_ncic_codes.tsv


In [56]:
joined_both.show(5)

+-----------+-------------------+--------+----------+----+--------------------+-------------+-----------+------------+------+----+-----------+--------------+----------+-------------------+--------------------+--------------------+--------------------+------------------------------------+
|  cdatetime|            address|district|      beat|grid|          crimedescr|ucr_ncic_code|   latitude|   longitude|  date|time|time_of_day|datetime_split|Crime Code|Crime Category Code|NCIC Charge Category|        Crime Charge|          Parameters|ICE Criminal Offense Level (Default)|
+-----------+-------------------+--------+----------+----+--------------------+-------------+-----------+------------+------+----+-----------+--------------+----------+-------------------+--------------------+--------------------+--------------------+------------------------------------+
|1/1/06 0:00| 3108 OCCIDENTAL DR|       3|3C        |1115|10851(A)VC TAKE V...|         2404|38.55042047|-121.3914158|1/1/06|0:00|   

In [60]:
dangerous_drug_crimes = joined_both.filter("`NCIC Charge Category` = 'Dangerous Drugs'")

# Group by drug names and count occurrences
top_drugs_involved = dangerous_drug_crimes.groupBy("`Crime Charge`").agg(count("*").alias("crimes_count"))\
                            .orderBy(desc("crimes_count")).limit(3)

top_drugs_involved.show()

+--------------------+------------+
|        Crime Charge|crimes_count|
+--------------------+------------+
|Dangerous Drugs (...|         129|
|   Cocaine - Possess|          66|
|Amphetamine - Pos...|          58|
+--------------------+------------+

