In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/28 16:24:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/28 16:24:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark

In [4]:
df = spark.read.format("json").load("data/flight-data/json/2015-summary.json")

                                                                                

In [5]:
df.show(4)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
+-----------------+-------------------+-----+
only showing top 4 rows



In [6]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [14]:
from pyspark.sql.types import StringType, StructField, StructType, LongType, IntegerType

In [15]:
myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", IntegerType(), False, metadata={"name": "Noli"})
])

In [16]:
df = spark.read.format("json").schema(myManualSchema).load("data/flight-data/json/2015-summary.json")

In [17]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [29]:
from pyspark.sql.functions import col, column, expr

In [20]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [25]:
df.createOrReplaceTempView("dfTable")

In [27]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [31]:
df.select(expr("DEST_COUNTRY_NAME as destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [32]:
df.selectExpr("DEST_COUNTRY_NAME as country", "ORIGIN_COUNTRY_NAME as originCountry").show(2)

+-------------+-------------+
|      country|originCountry|
+-------------+-------------+
|United States|      Romania|
|United States|      Croatia|
+-------------+-------------+
only showing top 2 rows



In [44]:
df.selectExpr(
    "*",
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as sameCountry").sort("sameCountry", ascending=False).show(20)

+--------------------+-------------------+------+-----------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|sameCountry|
+--------------------+-------------------+------+-----------+
|       United States|      United States|370002|       true|
|       United States|            Romania|    15|      false|
|       United States|            Croatia|     1|      false|
|       United States|            Ireland|   344|      false|
|               Egypt|      United States|    15|      false|
|       United States|              India|    62|      false|
|       United States|          Singapore|     1|      false|
|       United States|            Grenada|    62|      false|
|          Costa Rica|      United States|   588|      false|
|             Senegal|      United States|    40|      false|
|             Moldova|      United States|     1|      false|
|       United States|       Sint Maarten|   325|      false|
|       United States|   Marshall Islands|    39|      false|
|       

In [48]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(10)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [49]:
from pyspark.sql.functions import lit

In [50]:
df.select(expr("*"), lit(1).alias("One")).show(5)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
|    United States|            Ireland|  344|  1|
|            Egypt|      United States|   15|  1|
|    United States|              India|   62|  1|
+-----------------+-------------------+-----+---+
only showing top 5 rows



In [51]:
df.withColumn("adding one", lit(1)).show(5)

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|adding one|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|         1|
|    United States|            Croatia|    1|         1|
|    United States|            Ireland|  344|         1|
|            Egypt|      United States|   15|         1|
|    United States|              India|   62|         1|
+-----------------+-------------------+-----+----------+
only showing top 5 rows



In [53]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "destination")\
.show(10)

+-------------+-------------------+-----+
|  destination|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
|United States|            Ireland|  344|
|        Egypt|      United States|   15|
|United States|              India|   62|
|United States|          Singapore|    1|
|United States|            Grenada|   62|
|   Costa Rica|      United States|  588|
|      Senegal|      United States|   40|
|      Moldova|      United States|    1|
+-------------+-------------------+-----+
only showing top 10 rows



In [59]:
df.withColumn("count2", col("count").cast("long") + 50).show(10)

+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|    65|
|    United States|            Croatia|    1|    51|
|    United States|            Ireland|  344|   394|
|            Egypt|      United States|   15|    65|
|    United States|              India|   62|   112|
|    United States|          Singapore|    1|    51|
|    United States|            Grenada|   62|   112|
|       Costa Rica|      United States|  588|   638|
|          Senegal|      United States|   40|    90|
|          Moldova|      United States|    1|    51|
+-----------------+-------------------+-----+------+
only showing top 10 rows



In [64]:
df.filter(col("count") > 13).show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|    United States|       Sint Maarten|  325|
|    United States|   Marshall Islands|   39|
|           Guyana|      United States|   64|
+-----------------+-------------------+-----+
only showing top 10 rows



In [66]:
df.where(col("count") > 155).show(10)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Ireland|  344|
|          Costa Rica|      United States|  588|
|       United States|       Sint Maarten|  325|
|Turks and Caicos ...|      United States|  230|
|               Italy|      United States|  382|
|       United States|             Russia|  161|
|       United States|        Netherlands|  660|
|             Iceland|      United States|  181|
|            Honduras|      United States|  362|
|         The Bahamas|      United States|  955|
+--------------------+-------------------+-----+
only showing top 10 rows



In [67]:
df.where(col("count") > 0).where(col("DEST_COUNTRY_NAME") == "Kosovo").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Kosovo|      United States|    1|
+-----------------+-------------------+-----+



In [72]:
df.select(col("DEST_COUNTRY_NAME")).distinct().count()

132

In [73]:
df.rdd.getNumPartitions()

1

In [74]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|          Bolivia|      United States|   30|
+-----------------+-------------------+-----+
only showing top 2 rows



In [75]:
df.rdd.getNumPartitions()

1