In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/03 07:37:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/03 07:37:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df1 = spark.read.csv('violence_data.csv', header=True)
df2 = spark.read.csv('Reporte_Delito_Violencia_Intrafamiliar_Polic_a_Nacional.csv', header=True)
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType,IntegerType

# Change the data type using the withColumn method
df1 = df1.withColumn("Value", col("Value").cast(FloatType()))
df1 = df1.withColumn("RecordID", col("RecordID").cast(IntegerType()))
df2 = df2.withColumn("CANTIDAD", col("CANTIDAD").cast(IntegerType()))

In [3]:
df1.printSchema()
df2.printSchema()

root
 |-- RecordID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Demographics Question: string (nullable = true)
 |-- Demographics Response: string (nullable = true)
 |-- Question: string (nullable = true)
 |-- Survey Year: string (nullable = true)
 |-- Value: float (nullable = true)

root
 |-- DEPARTAMENTO: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- CODIGO DANE: string (nullable = true)
 |-- ARMAS MEDIOS: string (nullable = true)
 |-- FECHA HECHO: string (nullable = true)
 |-- GENERO: string (nullable = true)
 |-- GRUPO ETARIO: string (nullable = true)
 |-- CANTIDAD: integer (nullable = true)



In [4]:
columns_to_drop = ['DEPARTAMENTO', 'MUNICIPIO', 'CODIGO DANE', 'GRUPO ETARIO']
df1 = df1.drop('RecordID')
df2 = df2.select([col for col in df2.columns if col not in columns_to_drop])

In [5]:
df1.show()
df2.show()

+-----------+------+---------------------+---------------------+--------------------+-----------+-----+
|    Country|Gender|Demographics Question|Demographics Response|            Question|Survey Year|Value|
+-----------+------+---------------------+---------------------+--------------------+-----------+-----+
|Afghanistan|     F|       Marital status|        Never married|... if she burns ...| 01/01/2015| null|
|Afghanistan|     F|            Education|               Higher|... if she burns ...| 01/01/2015| 10.1|
|Afghanistan|     F|            Education|            Secondary|... if she burns ...| 01/01/2015| 13.7|
|Afghanistan|     F|            Education|              Primary|... if she burns ...| 01/01/2015| 13.8|
|Afghanistan|     F|       Marital status| Widowed, divorced...|... if she burns ...| 01/01/2015| 13.8|
|Afghanistan|     F|           Employment|    Employed for kind|... if she burns ...| 01/01/2015| 17.0|
|Afghanistan|     F|                  Age|                15-24|

In [6]:
from pyspark.sql.functions import col, year, date_format
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date

df3 = df2.withColumn('FECHA HECHO', to_date(col('FECHA HECHO'), 'd/MM/yyyy'))
# Remove rows with a date less than 2015
df3_filtered = df3.filter((year('FECHA HECHO') >= 2015) & (year('FECHA HECHO') < 2021))

df3_filtered = df3_filtered.withColumn('FECHA HECHO_STR', date_format('FECHA HECHO', 'yyyy-MM-dd HH:mm:ss'))
df3_filtered.select('FECHA HECHO_STR').describe().show(truncate=False)

[Stage 4:>                                                          (0 + 2) / 2]

+-------+-------------------+
|summary|FECHA HECHO_STR    |
+-------+-------------------+
|count  |349494             |
|mean   |null               |
|stddev |null               |
|min    |2015-01-01 00:00:00|
|max    |2020-12-31 00:00:00|
+-------+-------------------+



                                                                                

In [7]:
df4 = df1.na.drop(subset=['Value'])

is_null = df4.filter(col('Value').isNull())
if is_null.count() > 0:
    print("Null value exists")
else:
    print("The null value does not exist in dataset1")

The null value does not exist in dataset1


In [8]:
df4.describe().show()

[Stage 10:>                                                         (0 + 1) / 1]

+-------+-----------+------+---------------------+---------------------+--------------------+-----------+------------------+
|summary|    Country|Gender|Demographics Question|Demographics Response|            Question|Survey Year|             Value|
+-------+-----------+------+---------------------+---------------------+--------------------+-----------+------------------+
|  count|      11187| 11187|                11187|                11187|               11187|      11187|             11187|
|   mean|       null|  null|                 null|                 null|                null|       null| 19.76253687099183|
| stddev|       null|  null|                 null|                 null|                null|       null|16.986436699962074|
|    min|Afghanistan|     F|                  Age|                15-24|... for at least ...| 01/01/2000|               0.0|
|    max|   Zimbabwe|     M|            Residence| Widowed, divorced...|... if she refuse...| 01/01/2018|              86.9|


                                                                                

In [9]:
df4.write.csv('dataset1', header=True)

In [9]:
unique_values_gen = df3.select('GENERO').distinct().rdd.flatMap(lambda x: x).collect()
unique_values_armas = df3.select('ARMAS MEDIOS').distinct().rdd.flatMap(lambda x: x).collect()

print(" 'GENERO' column:", unique_values_gen)
print(" 'ARMAS MEDIOS' column:", unique_values_armas)

[Stage 16:>                                                         (0 + 2) / 2]

 'GENERO' column: ['FEMENINO', 'MASCULINO', 'NO REPORTA', None, '-']
 'ARMAS MEDIOS' column: ['ARMA DE FUEGO', 'ESCOPOLAMINA', 'SIN EMPLEO DE ARMAS', 'ARMA BLANCA / CORTOPUNZANTE', 'NO REPORTADO', 'CONTUNDENTES', 'CORTANTES', 'CORTOPUNZANTES', 'NO REPORTA', 'PUNZANTES', None, '-']


                                                                                

In [10]:
values_to_remove = ['-', 'NO REPORTA']
values_to_remove1 = ['-']
df5 = df3_filtered.filter(~col('GENERO').isin(values_to_remove) & 
                          ~col('ARMAS MEDIOS').isin(values_to_remove1) & 
                          (col('CANTIDAD') < 20))


In [11]:
df5.select('CANTIDAD').describe().show()

[Stage 19:>                                                         (0 + 2) / 2]

+-------+------------------+
|summary|          CANTIDAD|
+-------+------------------+
|  count|            347184|
|   mean|1.4986923360523527|
| stddev|1.6285316924048772|
|    min|                 1|
|    max|                19|
+-------+------------------+





In [26]:
has_null_values = df5.filter(df5['GENERO'].isNull() | df5['ARMAS MEDIOS'].isNull() | df5['CANTIDAD'].isNull()| df5['FECHA HECHO'].isNull())

if has_null_values.count() > 0:
    print("Null value exists in the DataFrame")
else:
    print("No null values in the DataFrame")

[Stage 66:>                                                         (0 + 2) / 2]

No null values in the DataFrame




In [13]:
df5.describe().show()



+-------+--------------------+---------+------------------+-------------------+
|summary|        ARMAS MEDIOS|   GENERO|          CANTIDAD|    FECHA HECHO_STR|
+-------+--------------------+---------+------------------+-------------------+
|  count|              347184|   347184|            347184|             347184|
|   mean|                null|     null|1.4986923360523527|               null|
| stddev|                null|     null|1.6285316924048772|               null|
|    min|ARMA BLANCA / COR...| FEMENINO|                 1|2015-01-01 00:00:00|
|    max| SIN EMPLEO DE ARMAS|MASCULINO|                19|2020-12-31 00:00:00|
+-------+--------------------+---------+------------------+-------------------+



                                                                                

In [28]:
unique_values_gen = df5.select('GENERO').distinct().rdd.flatMap(lambda x: x).collect()
unique_values_armas = df5.select('ARMAS MEDIOS').distinct().rdd.flatMap(lambda x: x).collect()

print(" 'GENERO' column:", unique_values_gen)
print(" 'ARMAS MEDIOS' column:", unique_values_armas)

[Stage 81:>                                                         (0 + 2) / 2]

 'GENERO' column: ['FEMENINO', 'MASCULINO']
 'ARMAS MEDIOS' column: ['ARMA DE FUEGO', 'ESCOPOLAMINA', 'SIN EMPLEO DE ARMAS', 'ARMA BLANCA / CORTOPUNZANTE', 'NO REPORTADO', 'CONTUNDENTES', 'CORTANTES', 'CORTOPUNZANTES', 'NO REPORTA', 'PUNZANTES']


                                                                                

In [35]:
from pyspark.sql.functions import col, count

grouped_data = df5.groupBy('FECHA HECHO', 'CANTIDAD', 'GENERO', 'ARMAS MEDIOS') \
       .agg(count("*").alias("Number of domestic violence"))

sorted_data = grouped_data.orderBy(col('FECHA HECHO'))
sorted_data.show()

[Stage 109:>                                                        (0 + 2) / 2]

+-----------+--------+---------+--------------------+---------------------------+
|FECHA HECHO|CANTIDAD|   GENERO|        ARMAS MEDIOS|Number of domestic violence|
+-----------+--------+---------+--------------------+---------------------------+
| 2015-01-01|       1| FEMENINO| SIN EMPLEO DE ARMAS|                         24|
| 2015-01-01|      10| FEMENINO|        CONTUNDENTES|                          1|
| 2015-01-01|       6| FEMENINO|        CONTUNDENTES|                          4|
| 2015-01-01|      10|MASCULINO|        CONTUNDENTES|                          1|
| 2015-01-01|       2| FEMENINO|        CONTUNDENTES|                          7|
| 2015-01-01|       3| FEMENINO|        CONTUNDENTES|                          8|
| 2015-01-01|       2|MASCULINO| SIN EMPLEO DE ARMAS|                          2|
| 2015-01-01|       5| FEMENINO|        CONTUNDENTES|                          2|
| 2015-01-01|       1| FEMENINO|        CONTUNDENTES|                         65|
| 2015-01-01|   

                                                                                

In [47]:
sorted_data.write.csv('dataset2015-2021', header=True)

                                                                                