In [0]:
# Initial import
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# How to read a csv file, with infer schema, use this option with small datasets

mms_file = "/databricks-datasets/learning-spark-v2/mnm_dataset.csv"  
mm_df = (spark.read.format("csv")
        .option("header","true")
         .option("inferSchema","true")
         .load(mms_file)
        )

In [0]:
# show is an action
mm_df.show(10)

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   TX|   Red|   20|
|   NV|  Blue|   66|
|   CO|  Blue|   79|
|   OR|  Blue|   71|
|   WA|Yellow|   93|
|   WY|  Blue|   16|
|   CA|Yellow|   53|
|   WA| Green|   60|
|   OR| Green|   71|
|   TX| Green|   68|
+-----+------+-----+
only showing top 10 rows



In [0]:
# some simple query using pyspark apis
# you can basically use all sql type of mechanism like: where, select, groupby ecc...
(mm_df
 .select("State","Color","Count")
 .groupBy("State","Color")
 .agg(F.count("Count").alias("Total"))
 .orderBy("Total",ascending=False)
).show(10)

+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
|   CA|Yellow| 1807|
|   WA| Green| 1779|
|   OR|Orange| 1743|
|   TX| Green| 1737|
|   TX|   Red| 1725|
|   CA| Green| 1723|
|   CO|Yellow| 1721|
|   CA| Brown| 1718|
|   CO| Green| 1713|
|   NV|Orange| 1712|
+-----+------+-----+
only showing top 10 rows



In [0]:
(mm_df
 .select("Color","Count")
 .where(mm_df.State=="CA")
 .groupBy("Color")
 .agg(F.count("Count").alias("Total"))
 .orderBy("Total",ascending=False)
).show(10)

+------+-----+
| Color|Total|
+------+-----+
|Yellow| 1807|
| Green| 1723|
| Brown| 1718|
|Orange| 1657|
|   Red| 1656|
|  Blue| 1603|
+------+-----+



In [0]:
mm_df.select(F.expr("Count * 2")).show(10)

+-----------+
|(Count * 2)|
+-----------+
|         40|
|        132|
|        158|
|        142|
|        186|
|         32|
|        106|
|        120|
|        142|
|        136|
+-----------+
only showing top 10 rows



In [0]:
# queries can also be executed in sql

mm_df.createOrReplaceTempView("mm_table_view")

In [0]:
# classic sql syntax
spark.sql("""

SELECT State,Color,AVG(Count)
FROM mm_table_view
WHERE Color = 'Yellow' 
GROUP BY State,Color

""").show()

+-----+------+------------------+
|State| Color|        avg(Count)|
+-----+------+------------------+
|   NM|Yellow| 54.94490521327014|
|   UT|Yellow|54.263829787234044|
|   AZ|Yellow| 54.98548972188634|
|   CO|Yellow| 55.22254503195816|
|   CA|Yellow|  55.8693967902601|
|   NV|Yellow|54.561194029850746|
|   TX|Yellow| 55.09042865531415|
|   WY|Yellow|53.997539975399754|
|   OR|Yellow| 54.60285006195787|
|   WA|Yellow|  55.8749248346362|
+-----+------+------------------+



In [0]:
# how to create a simple dataset

rows = [Row("mario","CA"),Row("luigi","AZ")]
peoples_df = spark.createDataFrame(rows,["Name","State"])
peoples_df.show()

+-----+-----+
| Name|State|
+-----+-----+
|mario|   CA|
|luigi|   AZ|
+-----+-----+



In [0]:
# you can specify the schema for larger datasets 

fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [0]:
# another way to load csv

sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"
sample_df = spark.read.csv(sf_fire_file, header=True,schema=fire_schema)

In [0]:
sample_df.show(10)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+------------+--------------------------+----------------------+------------------+---------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|    UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|   Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+

In [0]:
sample_df.select(col("UnitId")).show(10)

+------+
|UnitId|
+------+
|   M29|
|   M08|
|   B02|
|   B04|
|    D2|
|   E03|
|   E38|
|   E41|
|   M03|
|   RS1|
+------+
only showing top 10 rows



In [0]:
sample_df.count()

Out[54]: 4380660

In [0]:
# not null option
(sample_df
    .select("CallType")
    .where(col("CallType").isNotNull())
    .agg(countDistinct("CallType").alias("DistinctCallTypes"))
    .show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               32|
+-----------------+



In [0]:
# jobs take too much, lets truncate the dataset when doing the queries. we can use the limit option.
# we can convert string to date or timestamp data type
(sample_df.limit(10_000)
    .withColumn("call_date_formatted",to_timestamp(col("CallDate"),"MM/dd/yyyy"))
     .withColumn("available_dtm",to_timestamp(col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a"))
     .select(["call_date_formatted","CallDate","available_dtm","AvailableDtTm"])
    .show(10))

+-------------------+--------------------+
|      available_dtm|       AvailableDtTm|
+-------------------+--------------------+
|2002-01-11 01:58:43|01/11/2002 01:58:...|
|2002-01-11 02:10:17|01/11/2002 02:10:...|
|2002-01-11 01:47:00|01/11/2002 01:47:...|
|2002-01-11 01:51:54|01/11/2002 01:51:...|
|2002-01-11 01:47:00|01/11/2002 01:47:...|
|2002-01-11 01:47:00|01/11/2002 01:47:...|
|2002-01-11 01:51:17|01/11/2002 01:51:...|
|2002-01-11 01:47:00|01/11/2002 01:47:...|
|2002-01-11 01:46:38|01/11/2002 01:46:...|
|2002-01-11 01:46:57|01/11/2002 01:46:...|
+-------------------+--------------------+
only showing top 10 rows



In [0]:
(
    sample_df
        .limit(10_000)
        .select(sum("IncidentNumber"),avg("Priority"))
        .show()
)

+-------------------+-----------------+
|sum(IncidentNumber)|    avg(Priority)|
+-------------------+-----------------+
|        20058836634|2.804600061607968|
+-------------------+-----------------+



In [0]:
it_file = "/FileStore/tables/redo.csv"


In [0]:
# let's import another dataset

schema = StructType([
    StructField("id",IntegerType(),False),
    StructField("name",StringType(),False),
    StructField("surname",StringType(),False),
    StructField("departement",StringType(),False),
    StructField("salary",IntegerType(),False),
    StructField("skills",StringType(),False)
])



In [0]:
df_it_compay = spark.read.csv(it_file,header=False,schema=schema)

In [0]:
df_it_compay.show(10)

+---+---------+---------+-----------+------+--------------------+
| id|     name|  surname|departement|salary|              skills|
+---+---------+---------+-----------+------+--------------------+
|  0|    Carri| Stogginb|          C|   439|     C#  Java  C/C++|
|  1|  Trethaw|  Adestic|          C|   843|    C#  Java  Python|
|  2|Upigniona|Sleniator|          B|   256|                Java|
|  3|   Psater| Easephif|          D|   697|    Storm  R  Python|
|  4|Tentestes|Ieutimena|          D|   251|        Java  Python|
|  5|   Furron|  Asmahed|          D|  2745|C/C++  Java  Spar...|
|  6|     Anca|    Ardas|          A|   318|               C/C++|
|  7|     Arym| Atinquis|          C|  2786|       Spark  Python|
|  8|Halarstar|    Cloap|          C|   538|               C#  R|
|  9|Molotonla|    Strou|          B|   607|   C#  C/C++  Python|
+---+---------+---------+-----------+------+--------------------+
only showing top 10 rows



In [0]:
df_it_compay = df_it_compay.withColumn('skills', split(col('skills'),"  "))

In [0]:
(df_it_compay
    .groupby("departement")
    .agg(F.avg("salary").alias("avg_salary"))
    .orderBy(col("avg_salary").desc())
    .show()
)

+-----------+------------------+
|departement|        avg_salary|
+-----------+------------------+
|          A| 1965.374156219865|
|          B|1465.6964636542239|
|          C|1076.5136940547761|
|          D| 720.2936689549962|
+-----------+------------------+



In [0]:
(df_it_compay
    .withColumn("skills",explode("skills"))
    .groupBy("skills")
    .agg(F.avg("salary").alias("avg_salary"))
    .orderBy(col("avg_salary").desc())   
    .show() 
)

+----------+------------------+
|    skills|        avg_salary|
+----------+------------------+
|     Spark|   2857.2567018684|
|Javascript|1224.0312829525483|
|         R|1198.6492248062016|
|        C#| 1185.121782178218|
|       PHP|1180.7366511145672|
|    Python| 1171.110342920354|
|     C/C++|1159.8900634249471|
|      Java|1148.0251847796333|
|      Hive| 891.3939899833055|
|    Hadoop| 889.2778702163062|
|     Storm| 881.1098265895954|
|       MPI|  873.804347826087|
+----------+------------------+



In [0]:
(df_it_compay
    .withColumn("skills",F.explode("skills"))
    .groupBy("departement","skills")
    .agg(F.avg("salary").alias("avg_salary"))
    .orderBy(col("avg_salary").desc())   
    .show() 
)

+-----------+----------+------------------+
|departement|    skills|        avg_salary|
+-----------+----------+------------------+
|          A|     Spark| 5135.439516129032|
|          B|     Spark| 3806.163967611336|
|          C|     Spark| 2774.303934871099|
|          A|         R|           2227.06|
|          A|        C#| 2194.682926829268|
|          A|Javascript|  2193.37012987013|
|          A|     C/C++|  2059.30407523511|
|          A|      Java|1997.2983870967741|
|          A|       PHP|1996.4502369668246|
|          A|    Python|1994.8208556149732|
|          D|     Spark| 1867.822990844354|
|          B|       PHP|1697.7726098191215|
|          B|         R|1687.2376237623762|
|          A|    Hadoop|         1677.8125|
|          B|Javascript|1627.7692307692307|
|          A|     Storm|1587.1447368421052|
|          A|       MPI| 1582.295081967213|
|          A|      Hive|1581.4307692307693|
|          B|     C/C++| 1555.955357142857|
|          B|        C#|1537.546

In [0]:
(df_it_compay
    .withColumn("skills",F.explode("skills"))
    .groupBy("skills","departement")
    .max("salary")
    .orderBy(col("max(salary)").desc())
    .show() 
)

+----------+-----------+-----------+
|    skills|departement|max(salary)|
+----------+-----------+-----------+
|     Spark|          A|       8487|
|        C#|          A|       8487|
|       PHP|          A|       8487|
|    Python|          A|       8487|
|     C/C++|          A|       8376|
|         R|          A|       8376|
|      Java|          A|       8326|
|Javascript|          A|       8082|
|      Java|          B|       6487|
|     Spark|          B|       6487|
|       PHP|          B|       6487|
|    Python|          B|       6487|
|     C/C++|          B|       6472|
|         R|          B|       6472|
|Javascript|          B|       6367|
|        C#|          B|       6238|
|Javascript|          C|       4641|
|     Spark|          C|       4641|
|      Java|          C|       4641|
|    Python|          C|       4641|
+----------+-----------+-----------+
only showing top 20 rows



In [0]:
display(df_it_compay
    .withColumn("skills",F.explode("skills")))

id,name,surname,departement,salary,skills
0,Carri,Stogginb,C,439,C#
0,Carri,Stogginb,C,439,Java
0,Carri,Stogginb,C,439,C/C++
1,Trethaw,Adestic,C,843,C#
1,Trethaw,Adestic,C,843,Java
1,Trethaw,Adestic,C,843,Python
2,Upigniona,Sleniator,B,256,Java
3,Psater,Easephif,D,697,Storm
3,Psater,Easephif,D,697,R
3,Psater,Easephif,D,697,Python


Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks