## Big Data Engineering Semester Project

Before we begin setting our data consumer, we need to make sure we have a zookeper service running. When working with Apache Kafka, ZooKeeper is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.

In our case, we are using an image of zookeper that is running on a Docker container. To run the container, simply navigate to the folder containig docker-compose file of zookeper and execute command 'docker-compose up -d' to start the process.

Another tool that was used in our project is Offset Explorer, an open-source application that allows to connect to Kafka Cluster and visually see all the topics and messages. 

* Offset Exploer can dowloaded via this link - https://www.kafkatool.com/download.html

Additionaly, don't forget to install PySpark and Spark itslef both of which are vital to run this application. 

* Apache Spark can be installed from here - https://spark.apache.org/downloads.html.
* PySpark can be installed via pip command - 'pip install pyspark'
* Kafka can also be installed via pip command - 'pip install kafka-python'

On top of that, don't forget to set up Environment variables for Spark in Path of your machine

*  SPARK_HOME
* JAVA_HOME
* HADOOP_HOME 

**Create new Spark instance**

In order to use Spark Streaming with Kafka we need to initialise spark variable with a Kafka connector. Keep in mind that version string ("org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") should be changed according to what version of spark is used on your machine.

Spark version can be checked via this command - '!pyspark --version'

In [2]:
from pyspark.sql import SparkSession

# attention: the .config line is specific for the aida-n2

# Spark session & context
spark = (SparkSession
         .builder
         .appName('CrimeData')
         # Add kafka package (so that spark can find kafka for streaming)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1")
         .getOrCreate())
sc = spark.sparkContext

:: loading settings :: url = jar:file:/Users/vasilii/apache-spark/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/vasilii/.ivy2/cache
The jars for the packages stored in: /Users/vasilii/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6c4d9ac0-bee9-4c9f-98ad-470a5e771a8f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0

**Create stream dataframe setting kafka server, topic and offset option**

In [None]:
!pyspark --version


Now connecting to the previosly created Kafka topic where Kafka producer had writen our data.

In [3]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:29092") # kafka server
  .option("subscribe", "crimeData-chicago_100k") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  .option("multiline", True)
  .load())

**Convert Key and Value pairs to String type and make it available as a dataframe again**

Converting key and value paris from Kafka topic from byte to String allows us to view the data in String format.

In [None]:
display(df)

In [4]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))

In [None]:
display(df1)

**Create a structure that is used as schema for the streamed data**

Spark Streaming required streamed data to have a Schema, which is just a textual description of how columns look inside value column of Kafka Topic

In [5]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, ArrayType, StructField, BooleanType, LongType, IntegerType, DoubleType, DateType, TimestampType

# Event data schema
schema_wiki = StructType(
    [StructField("$schema",StringType(),True),
     StructField("id",StringType(),True),
     StructField("case_number",StringType(),True),
     StructField("date",TimestampType(),True),
     StructField("block",StringType(),True),
     StructField("iucr",StringType(),True),
     StructField("primary_type",StringType(),True),
     StructField("description",StringType(),True),
     StructField("location_description",StringType(),True),
     StructField("arrest",StringType(),True),
     StructField("domestic",StringType(),True),
     StructField("beat",StringType(),True),
     StructField("district",StringType(),True),
     StructField("ward",StringType(),True),
     StructField("community_area",StringType(),True),
     StructField("x_coordinate",StringType(),True),
     StructField("y_coordinate",StringType(),True),
     StructField("updated_on",StringType(),True),
     StructField("latitude",StringType(),True),
     StructField("longitude",StringType(),True),
     #StructField("latitude",StringType(),True),
     StructField("location", StructType(
                                    [StructField("latitude",StringType(),True),
                                    StructField("longitude",StringType(),True),
                                    StructField("human_address",StringType(),True),]),True)
     
    
    ]
     
      )

schema = ArrayType(schema_wiki, True)

# Create dataframe setting schema for event data
df_wiki = (df1
           # Sets schema for event data
           .withColumn("value", from_json("value", schema_wiki))
          )

In [None]:
display(df_wiki)

#### Start streaming data from Kafka topic

In [6]:
# Create query stream with memory sink
queryStream = (df_wiki
 .writeStream
 .format("memory")
 .queryName("wiki_changes")
 .outputMode("update")
 .start())

22/06/29 11:58:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vr/48qd0dvd3zx5h01r32l1_5k40000gn/T/temporary-b93a4a3b-9b98-4bc7-aaa0-c29e74b8a804. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/29 11:58:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [7]:
# Check active streams
for s in spark.streams.active:
    print("ID:{} | NAME:{}".format(s.id, s.name))

ID:3f4a5669-d3d5-49f8-8dae-9ebae4487290 | NAME:wiki_changes


#### Make queries to the streamed data 

Using spark.sql() command, we are able to query data, we have an option to store the output as a spark or pandas dataframe. We are doing both to showcase this functionality.

While using spar.sql() on our data, we have to add prefix 'value.' to every column we would like to query, this is because our data is stored withing the value column of the Kafka Topic. You can see from df2 dataframe output that 'crimeData_Chicago_100k' topic has additional columns such as: key, value, topic, partition, offset and timestamp.

In [8]:
from time import sleep
from IPython.display import clear_output
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
import numpy as np 

matplotlib.rc('font', family='DejaVu Sans')
sns.set(style="whitegrid")

try:
    i=1
    while True:
        # Clear output
        clear_output(wait=True)
    
        df = spark.sql(
                """
                    select
                       value.id,
                       value.case_number,
                       value.date,
                       value.block,
                       value.iucr,
                       value.primary_type,
                       value.description,
                       value.location_description,
                       value.arrest,
                       value.domestic,
                       value.beat,
                       value.district,
                       value.ward,
                       value.community_area,
                       value.x_coordinate,
                       value.y_coordinate,
                       value.updated_on,
                       value.latitude,
                       value.longitude,
                       value.location
                    from
                        wiki_changes
                    
                """
        )#.toPandas()
        
        df.printSchema()
        print(df)

    
        
    

        df1 = spark.sql(
                """
                    select
                       value.id,
                       value.case_number,
                       value.date,
                       value.block,
                       value.iucr,
                       value.primary_type,
                       value.description,
                       value.location_description,
                       value.arrest,
                       value.domestic,
                       value.beat,
                       value.district,
                       value.ward,
                       value.community_area,
                       value.x_coordinate,
                       value.y_coordinate,
                       value.updated_on,
                       value.latitude,
                       value.longitude,
                       value.location
                       
                    from
                        wiki_changes
                    
                """
        ).toPandas()
        
        display(df1)
        
        df2 = spark.sql("select * from wiki_changes").toPandas()
        
        display(df2)

        
        sleep(10)
        i=i+1
except KeyboardInterrupt:
    print("process interrupted.")

root
 |-- id: string (nullable = true)
 |-- case_number: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- block: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- arrest: string (nullable = true)
 |-- domestic: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- district: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- community_area: string (nullable = true)
 |-- x_coordinate: string (nullable = true)
 |-- y_coordinate: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- human_address: string (nullable = true)

DataFrame[id: string, case_number: str

22/06/29 11:58:37 WARN TaskSetManager: Stage 1 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Unnamed: 0,id,case_number,date,block,iucr,primary_type,description,location_description,arrest,domestic,beat,district,ward,community_area,x_coordinate,y_coordinate,updated_on,latitude,longitude,location
0,12736693,JF287903,2022-06-20 23:56:00,007XX W GARFIELD BLVD,0486,BATTERY,DOMESTIC BATTERY SIMPLE,APARTMENT,false,true,0711,007,20,68,1172439,1868272,2022-06-27T16:49:20.000,41.793979431,-87.643208858,"(41.793979431, -87.643208858, {""address"": """", ..."
1,12736637,JF287926,2022-06-20 23:56:00,015XX S KARLOV AVE,143A,WEAPONS VIOLATION,UNLAWFUL POSSESSION - HANDGUN,STREET,true,false,1012,010,24,29,1149303,1892129,2022-06-27T16:49:20.000,41.859924664,-87.727431051,"(41.859924664, -87.727431051, {""address"": """", ..."
2,12736641,JF287904,2022-06-20 23:54:00,005XX N OGDEN AVE,051A,ASSAULT,AGGRAVATED - HANDGUN,STREET,true,false,1214,012,27,24,1167891,1903682,2022-06-27T16:49:20.000,41.891246762,-87.658866407,"(41.891246762, -87.658866407, {""address"": """", ..."
3,12736591,JF287899,2022-06-20 23:54:00,034XX S ARCHER AVE,1345,CRIMINAL DAMAGE,TO CITY OF CHICAGO PROPERTY,STREET,false,false,0912,009,12,59,1163091,1881589,2022-06-27T16:49:20.000,41.830723665,-87.677114617,"(41.830723665, -87.677114617, {""address"": """", ..."
4,12736636,JF287914,2022-06-20 23:52:00,006XX W 103RD ST,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,false,false,2232,022,9,73,1173886,1836576,2022-06-27T16:49:20.000,41.706969624,-87.638840966,"(41.706969624, -87.638840966, {""address"": """", ..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,12579526,JE491744,2021-12-29 10:11:00,047XX W ADAMS ST,1477,WEAPONS VIOLATION,RECKLESS FIREARM DISCHARGE,ALLEY,true,false,1113,011,28,25,1144798,1898706,2022-01-05T15:49:10.000,41.878058829,-87.743802049,"(41.878058829, -87.743802049, {""address"": """", ..."
99996,12579529,JE491750,2021-12-29 10:09:00,123XX S YALE AVE,1477,WEAPONS VIOLATION,RECKLESS FIREARM DISCHARGE,ALLEY,false,false,0523,005,34,53,1176840,1823134,2022-01-05T15:49:10.000,41.670016883,-87.62842629,"(41.670016883, -87.62842629, {""address"": """", ""..."
99997,12579610,JE491722,2021-12-29 10:08:00,005XX N MICHIGAN AVE,0460,BATTERY,SIMPLE,DEPARTMENT STORE,true,false,1834,018,42,8,1177342,1903838,2022-01-05T15:49:10.000,41.891465732,-87.624153044,"(41.891465732, -87.624153044, {""address"": """", ..."
99998,12579538,JE491797,2021-12-29 10:00:00,056XX W NORTH AVE,1320,CRIMINAL DAMAGE,TO VEHICLE,RESIDENCE,false,false,2531,025,29,25,1138253,1910065,2022-01-05T15:49:10.000,41.9093502,-87.76755907,"(41.9093502, -87.76755907, {""address"": """", ""ci..."


Unnamed: 0,key,value,topic,partition,offset,timestamp,timestampType
0,,"(None, 12736693, JF287903, 2022-06-20 23:56:00...",crimeData-chicago_100k,0,0,2022-06-28 18:58:07.956,0
1,,"(None, 12736637, JF287926, 2022-06-20 23:56:00...",crimeData-chicago_100k,0,1,2022-06-28 18:58:07.956,0
2,,"(None, 12736641, JF287904, 2022-06-20 23:54:00...",crimeData-chicago_100k,0,2,2022-06-28 18:58:07.956,0
3,,"(None, 12736591, JF287899, 2022-06-20 23:54:00...",crimeData-chicago_100k,0,3,2022-06-28 18:58:07.956,0
4,,"(None, 12736636, JF287914, 2022-06-20 23:52:00...",crimeData-chicago_100k,0,4,2022-06-28 18:58:07.957,0
...,...,...,...,...,...,...,...
99995,,"(None, 12579526, JE491744, 2021-12-29 10:11:00...",crimeData-chicago_100k,0,99995,2022-06-28 18:58:56.703,0
99996,,"(None, 12579529, JE491750, 2021-12-29 10:09:00...",crimeData-chicago_100k,0,99996,2022-06-28 18:58:56.703,0
99997,,"(None, 12579610, JE491722, 2021-12-29 10:08:00...",crimeData-chicago_100k,0,99997,2022-06-28 18:58:56.703,0
99998,,"(None, 12579538, JE491797, 2021-12-29 10:00:00...",crimeData-chicago_100k,0,99998,2022-06-28 18:58:56.703,0


process interrupted.


In [None]:
# stop stream
queryStream.stop()
#sc.stop()

Double checking that all the data has been streamed. It is supposed to be 100.000 rows.

In [11]:
df1.shape

(100000, 20)

Enabling spark.sql.repl.eagerEval.enabled configuration for the eager evaluation of PySpark DataFrame

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

### Starting basic analysis

We can use both Spark Dataframe and Spark SQL tools to query the data.

#### Finding the most popular crime type

Using columns "primary_type" as well counting all rows to have a total number of crimes grouped by type.

In [21]:
crimeByType = df.groupby("primary_type").count().orderBy("count").show()

22/06/29 12:12:40 WARN TaskSetManager: Stage 23 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+-----+
|        primary_type|count|
+--------------------+-----+
|OTHER NARCOTIC VI...|    1|
|        NON-CRIMINAL|    2|
|    PUBLIC INDECENCY|    2|
|            GAMBLING|    5|
|   HUMAN TRAFFICKING|   10|
|           OBSCENITY|   14|
|          KIDNAPPING|   43|
|        INTIMIDATION|   64|
|LIQUOR LAW VIOLATION|   90|
|CONCEALED CARRY L...|   91|
|        PROSTITUTION|  138|
|            STALKING|  171|
|               ARSON|  177|
|INTERFERENCE WITH...|  188|
|            HOMICIDE|  301|
|PUBLIC PEACE VIOL...|  323|
|         SEX OFFENSE|  555|
|CRIMINAL SEXUAL A...|  684|
|OFFENSE INVOLVING...|  913|
|   CRIMINAL TRESPASS| 1913|
+--------------------+-----+
only showing top 20 rows



In [22]:
crimeByTypeSql = spark.sql("""
                select
                      wiki_changes.value.primary_type,
                       count(*)
                       
                    from
                        wiki_changes
                        group by wiki_changes.value.primary_type
                        order by count(*) desc
                        
""")
crimeByTypeSql.show()

22/06/29 12:13:01 WARN TaskSetManager: Stage 26 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+--------+
|        primary_type|count(1)|
+--------------------+--------+
|               THEFT|   22399|
|             BATTERY|   19034|
|     CRIMINAL DAMAGE|   11095|
|             ASSAULT|    9302|
|       OTHER OFFENSE|    7044|
|  DECEPTIVE PRACTICE|    6369|
| MOTOR VEHICLE THEFT|    5918|
|   WEAPONS VIOLATION|    4122|
|             ROBBERY|    3771|
|            BURGLARY|    3262|
|           NARCOTICS|    1999|
|   CRIMINAL TRESPASS|    1913|
|OFFENSE INVOLVING...|     913|
|CRIMINAL SEXUAL A...|     684|
|         SEX OFFENSE|     555|
|PUBLIC PEACE VIOL...|     323|
|            HOMICIDE|     301|
|INTERFERENCE WITH...|     188|
|               ARSON|     177|
|            STALKING|     171|
+--------------------+--------+
only showing top 20 rows



It is quite obvious that in our dataset, crimes like Theft and Battery are the most popular, 22.000 and 20.000 entries each. Top 3 is closing in Criminal damage offence with around 11.000 entries.

#### Finding the most popular location where crime was commited

We are using column "location_description" which stands for location where the crime was commited.

In [23]:
crimeByLocation = spark.sql("""
                select
                      wiki_changes.value.location_description,
                       count(*)
                       
                    from
                        wiki_changes
                        group by wiki_changes.value.location_description
                        order by count(*) desc
                        
""")
crimeByLocation.show()

22/06/29 12:18:46 WARN TaskSetManager: Stage 29 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+--------+
|location_description|count(1)|
+--------------------+--------+
|              STREET|   25651|
|           APARTMENT|   21007|
|           RESIDENCE|   13212|
|            SIDEWALK|    4861|
|PARKING LOT / GAR...|    3549|
|  SMALL RETAIL STORE|    3212|
|          RESTAURANT|    2115|
|               ALLEY|    2025|
|    DEPARTMENT STORE|    1632|
|COMMERCIAL / BUSI...|    1511|
|     OTHER (SPECIFY)|    1474|
|         GAS STATION|    1463|
|VEHICLE NON-COMME...|    1369|
|RESIDENCE - PORCH...|    1341|
|  RESIDENCE - GARAGE|    1142|
|  GROCERY FOOD STORE|    1060|
|SCHOOL - PUBLIC B...|    1017|
|RESIDENCE - YARD ...|     906|
|SCHOOL - PUBLIC G...|     759|
|       BAR OR TAVERN|     687|
+--------------------+--------+
only showing top 20 rows



Again, we have our top 3 locations, Street, Apartment, Residence. Whilst crimes commited on the streets are common, apartment crimes are a different story. Lets find out wheather any of those crimes can be classified as Domestic violence.

#### Finding out the rate of domestic violence

We are using column "domestic" which has values true/false and indicates whether the crime can be classifed as domestic violence.

We are looking into the ratio for the crimes that happened inside the apartment.

In [25]:
crimeDomestic = spark.sql("""
                select
                      wiki_changes.value.domestic,
                       count(*)
                       
                    from
                        wiki_changes
                        where wiki_changes.value.location_description = "APARTMENT"
                        group by wiki_changes.value.domestic
                        order by count(*) desc
                        
""")
crimeDomestic.show()

22/06/29 12:27:13 WARN TaskSetManager: Stage 32 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.


+--------+--------+
|domestic|count(1)|
+--------+--------+
|   false|   11297|
|    true|    9710|
+--------+--------+



Approximately half of the crimes that were reported in Apartments are considered a domestic violence.

Lets quickly see the whole picture, the domestic crime violence across all reported locations.

In [26]:
crimesDomestilAll = spark.sql("""
                select
                      wiki_changes.value.domestic,
                       count(*)
                       
                    from
                        wiki_changes
                        group by wiki_changes.value.domestic
                        order by count(*) desc
                        
""")
crimesDomestilAll.show()

22/06/29 12:33:48 WARN TaskSetManager: Stage 35 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.


+--------+--------+
|domestic|count(1)|
+--------+--------+
|   false|   80080|
|    true|   19920|
+--------+--------+



22/06/29 13:14:53 WARN KafkaOffsetReaderConsumer: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition crimeData-chicago_100k-0 could be determined


From the output we can see that around 20% of all reported crimes are domestically related, if put in comparison with crimes from within the apartments, it can be seen that domestic violense it prone to happen more often inside the house.

## Saving streamed data to a flat file and continuing out research

We are going to save the unprocessed data into a flat file, namely a parquet type.

We want to save the data before starting any data manipualtion in order to have a safe backup and the oppurtunity to share this file with other components of our project.

Spark natively supports writing files to parquet. The only thing we need to do is to call function write and provide it with the new parquet file name.

In [12]:
df.write.parquet("crimeData2.parquet")

22/06/29 01:20:12 WARN TaskSetManager: Stage 3 contains a task of very large size (15094 KiB). The maximum recommended task size is 1000 KiB.
22/06/29 01:20:44 ERROR Utils: Aborting task                        (0 + 4) / 4]
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
	at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
	at org.apache.parquet.io.api.Binary$ByteArrayBackedBinary.toByteBuffer(Binary.java:333)
	at org.apache.parquet.schema.PrimitiveComparator$BinaryComparator.compareNotNulls(PrimitiveComparator.java:186)
	at org.apache.parquet.schema.PrimitiveComparator$BinaryComparator.compareNotNulls(PrimitiveComparator.java:183)
	at org.apache.parquet.schema.PrimitiveComparator.compare(PrimitiveComparator.java:63)
	at org.apache.parquet.column.statistics.BinaryStatistics.updateStats(BinaryStatistics.java:60)
	at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:240)
	at org.apache.parquet.io.MessageColumn

	at org.apache.parquet.schema.PrimitiveComparator.compare(PrimitiveComparator.java:63)
	at org.apache.parquet.column.statistics.BinaryStatistics.updateStats(BinaryStatistics.java:60)
	at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:240)
	at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:476)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9(ParquetWriteSupport.scala:206)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9$adapted(ParquetWriteSupport.scala:204)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$3456/579064316.apply(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:162)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$3501/1604491665.apply$mcV$sp

	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
22/06/29 01:20:58 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 3.0 (TID 12),5,main]
org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tr

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/vasilii/opt/anaconda3/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3369, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/var/folders/vr/48qd0dvd3zx5h01r32l1_5k40000gn/T/ipykernel_62864/2232691093.py", line 1, in <cell line: 1>
    df.write.parquet("crimeData2.parquet")
  File "/Users/vasilii/opt/anaconda3/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 885, in parquet
    self._jwrite.parquet(path)
  File "/Users/vasilii/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/vasilii/opt/anaconda3/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/vasilii/opt/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaErr

ConnectionRefusedError: [Errno 61] Connection refused

### Reading out parquet files

There are two ways we can read out a file of type parquet.

1) Using pandas and pd.read_parquet() function, supplying path to the file itself and the parquet engine we are going to use (fastparquet in out case).

2) Using Spark by creating a new sqlContext variable and using read.parquet() function.

Both ways provide us with needed functionality:

#### Using Pandas

In [None]:
import pandas as pd
pandas_Crime_df= pd.read_parquet('crimeData.parquet', engine='fastparquet')

In [None]:
spark_Crime_df = spark.createDataFrame(pandas_Crime_df)

#### Using Spark

In [13]:
# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# to read parquet file
df = sqlContext.read.parquet('crimeData.parquet')



ConnectionRefusedError: [Errno 61] Connection refused