In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000011C5311DDD8>


In [6]:
df = spark.read.csv("D:/Spark/dataset/driver-license-permit-and-non-driver-identification-cards-issued-as-of-august-30-2017.csv", header=True, inferSchema=True)

In [7]:
df.show()

+-------------+---+---------------+-----+-----+----------------+-------------+---------+---------+------------------+
|Year of Birth|Sex|           City|State|  Zip|Residence County|License Class|   Status|Privilege|Year of Expiration|
+-------------+---+---------------+-----+-----+----------------+-------------+---------+---------+------------------+
|         1950|  F|       GLENMONT|   NY|12077|          ALBANY|            D|    VALID|     FULL|              2020|
|         1977|  F| WEST HEMPSTEAD|   NY|11552|          NASSAU|            D|    VALID|     FULL|              2019|
|         1967|  M|      SMITHTOWN|   NY|11787|         SUFFOLK|            D|    VALID|     FULL|              2019|
|         1989|  M|       BROOKLYN|   NY|11213|           KINGS|            D|    VALID|     FULL|              2020|
|         1994|  F|        BXVILLE|   NY|10708|     WESTCHESTER|            D|    VALID|     FULL|              2023|
|         1960|  M|       NEW YORK|   NY|10025|        N

In [8]:
df.schema

StructType(List(StructField(Year of Birth,IntegerType,true),StructField(Sex,StringType,true),StructField(City,StringType,true),StructField(State,StringType,true),StructField(Zip,StringType,true),StructField(Residence County,StringType,true),StructField(License Class,StringType,true),StructField(Status,StringType,true),StructField(Privilege,StringType,true),StructField(Year of Expiration,IntegerType,true)))

In [9]:
df.count()

15903561

In [10]:
df.createOrReplaceTempView("License")

In [11]:
result = spark.sql("SELECT DISTINCT State FROM License")

In [12]:
result.show()

+-----+
|State|
+-----+
|   AZ|
|   SC|
|   NS|
|   LA|
|   MN|
|   NK|
|   AA|
|   NJ|
|   DC|
|   GL|
|   CN|
|   OR|
|   NT|
|   VA|
|   QC|
|   RI|
|   KY|
|   WY|
|   BC|
|   NH|
+-----+
only showing top 20 rows



In [44]:
#Jumlah License di New York berdasarkan Licence Class nya

query1 = spark.sql("SELECT `License Class`, COUNT(`License class`) AS Jumlah \
                    FROM License \
                    WHERE City='NEW YORK' \
                    GROUP BY `License Class`")

In [45]:
query1.show()

+----------------+------+
|   License Class|Jumlah|
+----------------+------+
| E WITH A PERMIT|    67|
|              AM|   401|
| A WITH M PERMIT|    24|
| B WITH B PERMIT|     6|
| M WITH D PERMIT|    71|
| D WITH M PERMIT|   743|
|        B PERMIT|     1|
|        E PERMIT|    15|
|               E| 52062|
|EM WITH B PERMIT|     3|
|               B|  5733|
| N WITH M PERMIT|     4|
| D WITH A PERMIT|    66|
|CM WITH A PERMIT|     1|
| D WITH B PERMIT|   139|
|              BM|   532|
|               M|   240|
|BM WITH B PERMIT|     1|
|               D|732524|
|EM WITH A PERMIT|     2|
+----------------+------+
only showing top 20 rows



In [56]:
#Jumlah License yang expired pada tahun 2019 dan urutkan dari yang paling banyak
query2 = spark.sql("SELECT `City`, COUNT(`Year of Expiration`) AS Jumlah \
                    FROM License \
                    WHERE `Year of Expiration`=2019 \
                    GROUP BY `City` \
                    ORDER BY `Jumlah` DESC")

In [57]:
query2.show()

+-------------+------+
|         City|Jumlah|
+-------------+------+
|     BROOKLYN|316453|
|     NEW YORK|218912|
|        BRONX|173800|
|STATEN ISLAND| 82601|
|    ROCHESTER| 80796|
|      BUFFALO| 47242|
|     FLUSHING| 37047|
|     SYRACUSE| 31482|
|      JAMAICA| 30949|
|      YONKERS| 29345|
|       ALBANY| 21724|
|  SCHENECTADY| 20937|
|      ASTORIA| 17674|
| POUGHKEEPSIE| 14490|
| WHITE PLAINS| 13463|
|   BINGHAMTON| 12687|
|     ELMHURST| 12512|
| NEW ROCHELLE| 12102|
|VALLEY STREAM| 11836|
|     WOODSIDE| 11583|
+-------------+------+
only showing top 20 rows



In [61]:
#JUMLAH License yang berstatus valid yang pemegangnya adalah perempuan dan tinggal di Residence County NEW YORK
#Group by ZIP numbernya
query3 = spark.sql("SELECT `Zip`, COUNT(`Status`) AS Jumlah \
                    FROM License \
                    WHERE `Residence County`='NEW YORK' AND Status='VALID' AND Sex='F' \
                    GROUP BY `Zip` \
                    ORDER BY `Jumlah` DESC")

In [62]:
query3.show()

+-----+------+
|  Zip|Jumlah|
+-----+------+
|10025| 25897|
|10023| 20917|
|10128| 19995|
|10024| 19421|
|10002| 16954|
|10028| 15860|
|10021| 15452|
|10016| 14149|
|10009| 13898|
|10011| 13756|
|10003| 13580|
|10029| 13474|
|10027| 11547|
|10022| 10917|
|10019| 10883|
|10032| 10160|
|10031| 10063|
|10033|  9960|
|10065|  9953|
|10075|  9369|
+-----+------+
only showing top 20 rows



In [63]:
# Save the results QUERY 1 to CSV --> partitioned CSV
query1.write \
  .option("header", "true") \
  .csv("file:///D:/Spark/dataset/query1.csv")

In [64]:
# Convert to Pandas
import pandas as pd
query1Pandas = query1.toPandas()

In [65]:
# Save to single CSV
query1Pandas.to_csv("D:/Spark/dataset/query1Pandas.csv", index=False)

In [66]:
# Save the results QUERY 2 to CSV --> partitioned CSV
query2.write \
  .option("header", "true") \
  .csv("file:///D:/Spark/dataset/query2.csv")

In [67]:
# Convert to Pandas
import pandas as pd
query2Pandas = query2.toPandas()

In [68]:
# Save to single CSV
query2Pandas.to_csv("D:/Spark/dataset/query2Pandas.csv", index=False)

In [69]:
# Save the results QUERY 3 to CSV --> partitioned CSV
query2.write \
  .option("header", "true") \
  .csv("file:///D:/Spark/dataset/query3.csv")

In [70]:
# Convert to Pandas
import pandas as pd
query3Pandas = query3.toPandas()

In [71]:
# Save to single CSV
query3Pandas.to_csv("D:/Spark/dataset/query3Pandas.csv", index=False)