In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Data_Preparation').getOrCreate()

In [2]:
# Let's read in the data. Note that it's in the format of JSON.
dog_rawdata_merged = spark.read.load("Datasets/Dog_registred_hamilton_new_v1_2_4.csv", format="csv", header ="true")
dog_rawdata_merged.show()

+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+-----------+----------+-------+------+--------------+-------------------+-------------------+------------+--------------+---------------+-----------------+-----------------+---------------+--------------------+
|Dog_Number|Primary_Breed_Code|Secondary_Breed_Code|Primary_Colour_Code|Secondary_Colour_Code|Date_Of_Birth|Date_Of_Death|Date_Of_Departure|        Age|Animal_Sex|Desexed|Worker|Classification|Classification_Date|Classified_Date_Age|Dog_Number_2|Microchip_Flag|Microchip_Brand|Offence_Free_Flag|Active_Dog_Record|Total_Complains|         hash_binary|
+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+-----------+----------+-------+------+--------------+-------------------+-------------------+------------+--------------+---------------+-----------------+---

In [3]:
dog_rawdata_merged.filter(dog_rawdata_merged['Dog_Number'] == "103050").show() 

+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+-----------+----------+-------+------+--------------+-------------------+-------------------+------------+--------------+---------------+-----------------+-----------------+---------------+--------------------+
|Dog_Number|Primary_Breed_Code|Secondary_Breed_Code|Primary_Colour_Code|Secondary_Colour_Code|Date_Of_Birth|Date_Of_Death|Date_Of_Departure|        Age|Animal_Sex|Desexed|Worker|Classification|Classification_Date|Classified_Date_Age|Dog_Number_2|Microchip_Flag|Microchip_Brand|Offence_Free_Flag|Active_Dog_Record|Total_Complains|         hash_binary|
+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+-----------+----------+-------+------+--------------+-------------------+-------------------+------------+--------------+---------------+-----------------+---

In [4]:
columns_to_drop = ['Dog_Number_2','Microchip_Brand','Offence_Free_Flag','Active_Dog_Record','hash_binary']
dog_dropdata_merged= dog_rawdata_merged.drop(*columns_to_drop)
dog_dropdata_merged.show()

+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+-----------+----------+-------+------+--------------+-------------------+-------------------+--------------+---------------+
|Dog_Number|Primary_Breed_Code|Secondary_Breed_Code|Primary_Colour_Code|Secondary_Colour_Code|Date_Of_Birth|Date_Of_Death|Date_Of_Departure|        Age|Animal_Sex|Desexed|Worker|Classification|Classification_Date|Classified_Date_Age|Microchip_Flag|Total_Complains|
+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+-----------+----------+-------+------+--------------+-------------------+-------------------+--------------+---------------+
|    100320|              BDCO|                BDCO|                BLK|                  WHI|    3/10/1995|         null|         7/8/2005| 10.3369863|         F|      N|     N|Not Applicable|            

In [5]:
dog_dropdata_merged.columns

['Dog_Number',
 'Primary_Breed_Code',
 'Secondary_Breed_Code',
 'Primary_Colour_Code',
 'Secondary_Colour_Code',
 'Date_Of_Birth',
 'Date_Of_Death',
 'Date_Of_Departure',
 'Age',
 'Animal_Sex',
 'Desexed',
 'Worker',
 'Classification',
 'Classification_Date',
 'Classified_Date_Age',
 'Microchip_Flag',
 'Total_Complains']

In [6]:
dog_dropdata_merged.filter("Age < 0").count()

12173

In [7]:
dog_dropdata_merged.filter("Age == 0").count()

3

In [8]:
dog_dropdata_merged.filter("Age > 0").count()

59252

In [9]:
dog_dropdata_merged.filter("Age < 0").select("Age").sort("Age", ascending=False).show()

+------------+
|         Age|
+------------+
| -87.6630137|
|-79.84931507|
|-74.22739726|
|-7.019178082|
|-59.10684932|
|-4.945205479|
|-4.630136986|
|-4.621917808|
|-4.619178082|
|-4.616438356|
|-4.610958904|
|-4.605479452|
|-4.553424658|
|-4.421917808|
|-4.128767123|
|-3.821917808|
|-3.745205479|
|-3.728767123|
| -3.57260274|
| -3.57260274|
+------------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import when
dog_changeAge_merged = dog_dropdata_merged.withColumn("Age", \
              when(dog_dropdata_merged["Age"] <= 0, -1).otherwise(dog_dropdata_merged["Age"]))


In [11]:
dog_changeAge_merged.filter("Age <= 0").count()

12176

In [12]:
dog_changeAge_merged.filter("Age = -1").count()

12176

In [13]:
dog_changeAge_merged.filter("Age = -1").select("Age").sort("Age", ascending=False).show()

+---+
|Age|
+---+
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
| -1|
+---+
only showing top 20 rows



In [14]:
dog_missClass_merged = dog_changeAge_merged.na.fill('-1', subset=['Classification_Date'])
dog_missClass_merged.select("Classification_Date").head()


Row(Classification_Date='-1')

In [15]:
dog_missClass_merged.select("Classification_Date").distinct().orderBy("Classification_Date").show(n=999999, truncate=True)

+-------------------+
|Classification_Date|
+-------------------+
|                 -1|
|          1/10/2017|
|          1/11/2006|
|          1/11/2008|
|          1/11/2011|
|          1/12/2009|
|          1/12/2011|
|          1/13/2015|
|          1/14/2009|
|          1/14/2016|
|          1/14/2020|
|          1/16/2007|
|          1/17/2011|
|          1/18/2006|
|          1/18/2008|
|          1/18/2018|
|          1/19/2006|
|          1/19/2017|
|          1/19/2018|
|          1/20/2011|
|          1/21/2008|
|          1/21/2009|
|          1/21/2015|
|          1/22/2007|
|          1/22/2009|
|          1/23/2012|
|          1/23/2019|
|          1/24/2017|
|          1/24/2019|
|          1/25/2019|
|          1/27/2012|
|          1/27/2016|
|          1/29/2015|
|          1/29/2020|
|           1/3/2008|
|           1/3/2013|
|          1/30/2009|
|          1/30/2014|
|          1/31/2007|
|          1/31/2017|
|          1/31/2018|
|           1/4/2007|
|         

In [16]:
dog_missComplain_merged = dog_missClass_merged.na.fill('0', subset=['Total_Complains'])
dog_missComplain_merged.select("Total_Complains").distinct().orderBy("Total_Complains").show(n=999999, truncate=True)

+---------------+
|Total_Complains|
+---------------+
|              0|
|              1|
|             10|
|             11|
|             12|
|             13|
|             14|
|             15|
|             16|
|             17|
|             18|
|             19|
|              2|
|             20|
|             21|
|             22|
|             23|
|             24|
|             25|
|             26|
|             27|
|             28|
|             29|
|              3|
|             30|
|             31|
|             32|
|             33|
|             34|
|             35|
|             36|
|             37|
|             38|
|              4|
|             42|
|             43|
|              5|
|              6|
|              7|
|              8|
|              9|
+---------------+



In [17]:
from pyspark.sql.functions import when
dog_dropAge_merged = dog_missComplain_merged.filter(dog_missComplain_merged.Age > 0)
dog_dropAge_merged.select("Age").orderBy("Age").show()

+-----------+
|        Age|
+-----------+
|0.002739726|
|0.002739726|
|0.002739726|
|0.002739726|
|0.002739726|
|0.005479452|
|0.005479452|
|0.005479452|
|0.005479452|
|0.005479452|
|0.005479452|
|0.008219178|
|0.010958904|
|0.010958904|
|0.010958904|
|0.010958904|
| 0.01369863|
| 0.01369863|
|0.016438356|
|0.016438356|
+-----------+
only showing top 20 rows



In [18]:
dog_dropAge_merged.filter(dog_dropAge_merged.Age <=0).count()

0

In [19]:
dog_dropAge_merged.filter(dog_dropAge_merged.Animal_Sex == "U").count()

4

In [20]:
dog_dropsex_merged = dog_dropAge_merged.filter(dog_dropAge_merged.Animal_Sex != "U")
dog_dropsex_merged.filter(dog_dropAge_merged.Animal_Sex == "U").count()

0

In [21]:
dog_dropsex_merged.filter(dog_dropAge_merged.Animal_Sex != "U").count()

59248

In [22]:
dog_dropsex_merged.filter(dog_dropAge_merged.Animal_Sex != "M").count()

28659

In [23]:
dog_dropsex_merged.filter(dog_dropAge_merged.Animal_Sex != "F").count()

30589

In [24]:
dog_dropsex_merged.select("Primary_Breed_Code").distinct().orderBy("Primary_Breed_Code").show(n=999999, truncate=True)
dog_dropsex_merged.select("Secondary_Breed_Code").distinct().orderBy("Secondary_Breed_Code").show(n=999999, truncate=True)

+------------------+
|Primary_Breed_Code|
+------------------+
|              null|
|               1JC|
|               3RC|
|               5AT|
|               5HP|
|              ACAT|
|              ACOC|
|              AFFE|
|              AFGM|
|              AFHD|
|              AHWS|
|              AIRE|
|              AKIT|
|              AKOO|
|              AMAL|
|              ASTA|
|              ASTE|
|              ATER|
|              AUST|
|              BAJI|
|              BASS|
|              BBUL|
|              BDCO|
|              BDTE|
|              BEAG|
|              BEDT|
|              BERN|
|              BFDB|
|              BICH|
|              BLOO|
|              BORZ|
|              BOST|
|              BOUV|
|              BOXE|
|              BRAC|
|              BRIA|
|              BRSP|
|              BULA|
|              BUMA|
|              BUTE|
|              BUTM|
|              CALE|
|              CCRE|
|              CESK|
|            

In [25]:
dog_droppb_merged = dog_dropsex_merged.na.drop(subset="Primary_Breed_Code")
dog_droppb_merged.select("Primary_Breed_Code").distinct().orderBy("Primary_Breed_Code").show(n=999999, truncate=True)

+------------------+
|Primary_Breed_Code|
+------------------+
|               1JC|
|               3RC|
|               5AT|
|               5HP|
|              ACAT|
|              ACOC|
|              AFFE|
|              AFGM|
|              AFHD|
|              AHWS|
|              AIRE|
|              AKIT|
|              AKOO|
|              AMAL|
|              ASTA|
|              ASTE|
|              ATER|
|              AUST|
|              BAJI|
|              BASS|
|              BBUL|
|              BDCO|
|              BDTE|
|              BEAG|
|              BEDT|
|              BERN|
|              BFDB|
|              BICH|
|              BLOO|
|              BORZ|
|              BOST|
|              BOUV|
|              BOXE|
|              BRAC|
|              BRIA|
|              BRSP|
|              BULA|
|              BUMA|
|              BUTE|
|              BUTM|
|              CALE|
|              CCRE|
|              CESK|
|              CFOU|
|            

In [26]:
dog_droppb_merged.select("Primary_Colour_Code").distinct().orderBy("Primary_Colour_Code").show(n=999999, truncate=True)

+-------------------+
|Primary_Colour_Code|
+-------------------+
|                BDL|
|                BLK|
|                BLU|
|                BRO|
|                GLD|
|                GRY|
|                ONG|
|                RED|
|                TAN|
|                TRI|
|                WHI|
+-------------------+



In [27]:
from pyspark.sql.functions import isnan, when, count, col
dog_droppb_merged.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dog_droppb_merged.columns]).show()

+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+---+----------+-------+------+--------------+-------------------+-------------------+--------------+---------------+
|Dog_Number|Primary_Breed_Code|Secondary_Breed_Code|Primary_Colour_Code|Secondary_Colour_Code|Date_Of_Birth|Date_Of_Death|Date_Of_Departure|Age|Animal_Sex|Desexed|Worker|Classification|Classification_Date|Classified_Date_Age|Microchip_Flag|Total_Complains|
+----------+------------------+--------------------+-------------------+---------------------+-------------+-------------+-----------------+---+----------+-------+------+--------------+-------------------+-------------------+--------------+---------------+
|         0|                 0|                   0|                  0|                    0|            0|        32996|            37408|  0|         0|      0|     0|             0|                  0|              57644|    

In [28]:
dog_droppb_merged.select("Primary_Colour_Code").distinct().show(n=999999, truncate=True)

+-------------------+
|Primary_Colour_Code|
+-------------------+
|                RED|
|                BLK|
|                ONG|
|                GLD|
|                TRI|
|                BRO|
|                BLU|
|                BDL|
|                WHI|
|                TAN|
|                GRY|
+-------------------+



In [29]:
dog_droppb_merged.select("Secondary_Colour_Code").distinct().show(n=999999, truncate=True)

+---------------------+
|Secondary_Colour_Code|
+---------------------+
|                  RED|
|                  BLK|
|                  ONG|
|                  GLD|
|                  TRI|
|                  BRO|
|                  BLU|
|                  BDL|
|                  WHI|
|                  TAN|
|                  GRY|
+---------------------+



In [30]:
dog_droppb_merged.groupBy("Primary_Colour_Code").count().show()

+-------------------+-----+
|Primary_Colour_Code|count|
+-------------------+-----+
|                RED| 2757|
|                BLK|21461|
|                ONG|  195|
|                GLD| 4239|
|                TRI| 3133|
|                BRO| 5341|
|                BLU|  430|
|                BDL| 3041|
|                WHI| 8811|
|                TAN| 8180|
|                GRY| 1540|
+-------------------+-----+



In [31]:
from pyspark.sql.functions import when

dog_pcchanged_merged = dog_droppb_merged.withColumn("Primary_Colour_Code", \
              when(dog_droppb_merged["Primary_Colour_Code"] == "BLK", "DARK").\
              when(dog_droppb_merged["Primary_Colour_Code"] == "TAN", "DARK").\
              when(dog_droppb_merged["Primary_Colour_Code"] == "BDL", "DARK").\
              otherwise("LIGHT"))


In [32]:
dog_pcchanged_merged.groupBy("Primary_Colour_Code").count().show()

+-------------------+-----+
|Primary_Colour_Code|count|
+-------------------+-----+
|               DARK|32682|
|              LIGHT|26446|
+-------------------+-----+



In [33]:
dog_pcchanged_merged.groupBy("Secondary_Colour_Code").count().show()

+---------------------+-----+
|Secondary_Colour_Code|count|
+---------------------+-----+
|                  RED| 1490|
|                  BLK| 9104|
|                  ONG|  251|
|                  GLD| 4203|
|                  TRI| 3154|
|                  BRO| 4251|
|                  BLU|  281|
|                  BDL| 2574|
|                  WHI|19629|
|                  TAN|12727|
|                  GRY| 1464|
+---------------------+-----+



In [34]:
from pyspark.sql.functions import when

dog_scchanged_merged = dog_pcchanged_merged.withColumn("Secondary_Colour_Code", \
              when(dog_droppb_merged["Secondary_Colour_Code"] == "BLK", "DARK").\
              when(dog_droppb_merged["Secondary_Colour_Code"] == "TAN", "DARK").\
              when(dog_droppb_merged["Secondary_Colour_Code"] == "BDL", "DARK").\
              otherwise("LIGHT"))

In [35]:
dog_scchanged_merged.groupBy("Secondary_Colour_Code").count().show()

+---------------------+-----+
|Secondary_Colour_Code|count|
+---------------------+-----+
|                 DARK|24405|
|                LIGHT|34723|
+---------------------+-----+



In [36]:
dog_scchanged_merged.groupBy("Classification").count().show()

+--------------+-----+
|Classification|count|
+--------------+-----+
|  Menacing Dog| 1458|
| Dangerous Dog|   61|
|Not Applicable|57609|
+--------------+-----+



In [37]:
from pyspark.sql.functions import when

dog_clachanged_merged = dog_scchanged_merged.withColumn("Classification", \
              when(dog_scchanged_merged["Classification"] == "Menacing Dog", "Unsafe").\
              when(dog_scchanged_merged["Classification"] == "Dangerous Dog", "Unsafe").\
              otherwise("Safe"))

In [38]:
dog_clachanged_merged.groupBy("Classification").count().show()

+--------------+-----+
|Classification|count|
+--------------+-----+
|          Safe|57609|
|        Unsafe| 1519|
+--------------+-----+



In [39]:
from pyspark.sql.types import DateType, IntegerType

dog_clachanged1_merged = dog_clachanged_merged.withColumn("Age", dog_clachanged_merged["Age"].cast(IntegerType()))


In [40]:
from pyspark.sql.functions import floor

dog_clachanged1_merged = dog_clachanged1_merged.withColumn("Age", floor(dog_clachanged1_merged.Age))

In [41]:
dog_clachanged1_merged.select("Age").show()

+---+
|Age|
+---+
| 10|
|  1|
|  2|
|  0|
| 13|
| 18|
|  7|
|  0|
| 10|
|  7|
|  3|
| 12|
| 13|
|  6|
| 13|
| 13|
|  5|
|  9|
| 15|
|  2|
+---+
only showing top 20 rows



In [42]:
dog_rawdata_merged.select("Age").show()

+-----------+
|        Age|
+-----------+
| 10.3369863|
|1.542465753|
|2.128767123|
|0.463013699|
|         13|
|18.28219178|
|7.956164384|
|0.221917808|
|10.37808219|
|7.769863014|
|3.432876712|
|12.98630137|
|13.54794521|
|6.147945205|
| 13.4109589|
|13.99178082|
|5.917808219|
|9.287671233|
|15.30684932|
|2.635616438|
+-----------+
only showing top 20 rows



In [43]:
dog_clachanged1_merged.select("Total_Complains","Age","Classification").show()

+---------------+---+--------------+
|Total_Complains|Age|Classification|
+---------------+---+--------------+
|              0| 10|          Safe|
|              4|  1|          Safe|
|              0|  2|          Safe|
|              2|  0|          Safe|
|              0| 13|          Safe|
|              0| 18|          Safe|
|              2|  7|          Safe|
|              1|  0|          Safe|
|              1| 10|          Safe|
|              2|  7|          Safe|
|              3|  3|          Safe|
|              1| 12|          Safe|
|              1| 13|          Safe|
|              0|  6|          Safe|
|              0| 13|          Safe|
|              0| 13|          Safe|
|              3|  5|          Safe|
|              1|  9|          Safe|
|              0| 15|          Safe|
|              0|  2|          Safe|
+---------------+---+--------------+
only showing top 20 rows



In [44]:
dog_orderchanged_merged = dog_clachanged1_merged.orderBy(["Total_Complains","Age","Classification"], ascending=True)
dog_orderchanged_merged.select("Total_Complains","Age","Classification").show()

+---------------+---+--------------+
|Total_Complains|Age|Classification|
+---------------+---+--------------+
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
|              0|  0|          Safe|
+---------------+---+--------------+
only showing top 20 rows



In [48]:
columns_to_drop = ["Date_Of_Birth","Date_Of_Death","Date_Of_Departure","Classification_Date","Classified_Date_Age"]
dog_drop_merged= dog_orderchanged_merged.drop(*columns_to_drop)
dog_drop_merged.show()

+----------+------------------+--------------------+-------------------+---------------------+---+----------+-------+------+--------------+--------------+---------------+
|Dog_Number|Primary_Breed_Code|Secondary_Breed_Code|Primary_Colour_Code|Secondary_Colour_Code|Age|Animal_Sex|Desexed|Worker|Classification|Microchip_Flag|Total_Complains|
+----------+------------------+--------------------+-------------------+---------------------+---+----------+-------+------+--------------+--------------+---------------+
|    149104|              TPOO|                MALT|               DARK|                LIGHT|  0|         F|      N|     N|          Safe|             Y|              0|
|    223413|              MAST|                CRXX|               DARK|                 DARK|  0|         M|      N|     N|          Safe|             Y|              0|
|    151093|              NEMA|                ROTT|               DARK|                 DARK|  0|         M|      N|     N|          Safe|      

In [49]:
columns_to_drop = ["Primary_Breed_Code","Secondary_Breed_Code"]
dog_drop_merged1= dog_drop_merged.drop(*columns_to_drop)
dog_drop_merged1.show()

+----------+-------------------+---------------------+---+----------+-------+------+--------------+--------------+---------------+
|Dog_Number|Primary_Colour_Code|Secondary_Colour_Code|Age|Animal_Sex|Desexed|Worker|Classification|Microchip_Flag|Total_Complains|
+----------+-------------------+---------------------+---+----------+-------+------+--------------+--------------+---------------+
|    149104|               DARK|                LIGHT|  0|         F|      N|     N|          Safe|             Y|              0|
|    223413|               DARK|                 DARK|  0|         M|      N|     N|          Safe|             Y|              0|
|    151093|               DARK|                 DARK|  0|         M|      N|     N|          Safe|             Y|              0|
|    223603|              LIGHT|                LIGHT|  0|         F|      N|     N|          Safe|             Y|              0|
|    193746|               DARK|                 DARK|  0|         M|      N|     N

In [51]:
dog_drop_merged1.repartition(1).write.format("com.databricks.spark.csv").option("header", "true")\
.save("Dog_registred_hamilton_new_v1_2_5.csv")