. Big data refers to extremely large sets of structured and unstructured data that cannot be handled with traditional methods
. Apache Spark is a unified computing engine and a set of libraries for parallel data processing on
computer clusters
. We use Pache Spark for high speed data querying, analysis, and transformation with large data sets that is Big data.

. Spark Applications consist of a driver process and a set of executor processes. The driver process
runs your main() function, sits on a node in the cluster, and is responsible for three things:
maintaining information about the Spark Application; responding to a user’s program or input;
and analyzing, distributing, and scheduling work across the executors (discussed momentarily).
The driver process is absolutely essential—it’s the heart of a Spark Application and maintains all
relevant information during the lifetime of the application.
The executors are responsible for actually carrying out the work that the driver assigns them.
This means that each executor is responsible for only two things: executing code assigned to it
by the driver, and reporting the state of the computation on that executor back to the driver node

. We control your Spark Application through a driver process called the SparkSession

.A DataFrame is the most common Structured API and simply represents a table of data with
rows and columns

. To allow every executor to perform work in parallel, Spark breaks up the data into chunks called
partitions. A partition is a collection of rows that sit on one physical machine in your cluster. A
DataFrame’s partitions represent how the data is physically distributed across the cluster of
machines during execution

.To “change” a DataFrame, you need to instruct Spark how you would like to
modify it to do what you want. These instructions are called transformations.

.Lazy evaulation means that Spark will wait until the very last moment to execute the graph of
computation instructions. In Spark, instead of modifying the data immediately when you express
some operation, you build up a plan of transformations that you would like to apply to your
source data. By waiting until the last minute to execute the code, Spark compiles this plan from
your raw DataFrame transformations to a streamlined physical plan that will run as efficiently as
possible across the cluster

In [1]:
import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import IntegerType, LongType, StructField, StructType ,  StringType , FloatType

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
spark = SparkSession.builder.appName("Basic Structured Operation").getOrCreate()
df = spark.range(500).toDF("numbers")

22/09/01 10:31:51 WARN Utils: Your hostname, TIGER03366 resolves to a loopback address: 127.0.1.1; using 172.26.198.76 instead (on interface eth0)
22/09/01 10:31:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/01 10:31:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df.select(df["numbers"] + 10).show(df.count())

                                                                                

+--------------+
|(numbers + 10)|
+--------------+
|            10|
|            11|
|            12|
|            13|
|            14|
|            15|
|            16|
|            17|
|            18|
|            19|
|            20|
|            21|
|            22|
|            23|
|            24|
|            25|
|            26|
|            27|
|            28|
|            29|
|            30|
|            31|
|            32|
|            33|
|            34|
|            35|
|            36|
|            37|
|            38|
|            39|
|            40|
|            41|
|            42|
|            43|
|            44|
|            45|
|            46|
|            47|
|            48|
|            49|
|            50|
|            51|
|            52|
|            53|
|            54|
|            55|
|            56|
|            57|
|            58|
|            59|
|            60|
|            61|
|            62|
|            63|
|            64|
|            6

In [6]:
spark.range(2).collect()

[Row(id=0), Row(id=1)]

# Creating a DataFrame

In [7]:
customer_df = spark.read.format("csv").load("customer-orders.csv")

In [8]:
customer_df.show(10)

+---+----+-----+
|_c0| _c1|  _c2|
+---+----+-----+
| 44|8602|37.19|
| 35|5368|65.89|
|  2|3391|40.64|
| 47|6694|14.98|
| 29| 680|13.08|
| 91|8900|24.59|
| 70|3959|68.68|
| 85|1733|28.53|
| 53|9900|83.55|
| 14|1505| 4.32|
+---+----+-----+
only showing top 10 rows



# Schema

In [9]:
customer_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [10]:
spark.read.format("csv").load("customer-orders.csv").schema

StructType([StructField('_c0', StringType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True)])

In [11]:
mySchema = StructType([
    StructField("customer_id" , IntegerType() , True),
    StructField("item_id", IntegerType() , True),
    StructField("Amount" , FloatType() , True)
])

In [12]:
customer_new_df = spark.read.schema(mySchema).csv("customer-orders.csv")

In [13]:
customer_new_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- Amount: float (nullable = true)



# Columns

In [14]:
from pyspark.sql import functions as func

In [15]:
customer_new_df.customer_id

Column<'customer_id'>

# Rows

In [16]:
customer_new_df.first()

Row(customer_id=44, item_id=8602, Amount=37.189998626708984)

In [17]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [18]:
myRow

<Row('Hello', None, 1, False)>

In [19]:
myRow[0]

'Hello'

In [20]:
myRow[2]

1

# Creating the DataFrame

In [21]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)

# Select and SelectExpr

In [22]:
customer_new_df.select("customer_id").show(2)

+-----------+
|customer_id|
+-----------+
|         44|
|         35|
+-----------+
only showing top 2 rows



In [23]:
customer_new_df.select("customer_id" , "item_id").show(20)

+-----------+-------+
|customer_id|item_id|
+-----------+-------+
|         44|   8602|
|         35|   5368|
|          2|   3391|
|         47|   6694|
|         29|    680|
|         91|   8900|
|         70|   3959|
|         85|   1733|
|         53|   9900|
|         14|   1505|
|         51|   3378|
|         42|   6926|
|          2|   4424|
|         79|   9291|
|         50|   3901|
|         20|   6633|
|         15|   6148|
|         44|   8331|
|          5|   3505|
|         48|   5539|
+-----------+-------+
only showing top 20 rows



In [24]:
customer_new_df.select(func.expr("customer_id as Customer_ID")).show(10)

+-----------+
|Customer_ID|
+-----------+
|         44|
|         35|
|          2|
|         47|
|         29|
|         91|
|         70|
|         85|
|         53|
|         14|
+-----------+
only showing top 10 rows



In [25]:
customer_new_df.show(10)

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         44|   8602| 37.19|
|         35|   5368| 65.89|
|          2|   3391| 40.64|
|         47|   6694| 14.98|
|         29|    680| 13.08|
|         91|   8900| 24.59|
|         70|   3959| 68.68|
|         85|   1733| 28.53|
|         53|   9900| 83.55|
|         14|   1505|  4.32|
+-----------+-------+------+
only showing top 10 rows



In [26]:
customer_new_df.selectExpr("count(distinct(customer_id))" , "count(distinct(item_id))").show()

+---------------------------+-----------------------+
|count(DISTINCT customer_id)|count(DISTINCT item_id)|
+---------------------------+-----------------------+
|                        100|                   6369|
+---------------------------+-----------------------+



In [27]:
customer_new_df.selectExpr("sum(Amount)").show()

+------------------+
|       sum(Amount)|
+------------------+
|500489.18006796576|
+------------------+



# Adding Columns

In [28]:
from pyspark.sql.functions import lit
customer_new_df.withColumn("numberOne", lit(1)).show(10)

+-----------+-------+------+---------+
|customer_id|item_id|Amount|numberOne|
+-----------+-------+------+---------+
|         44|   8602| 37.19|        1|
|         35|   5368| 65.89|        1|
|          2|   3391| 40.64|        1|
|         47|   6694| 14.98|        1|
|         29|    680| 13.08|        1|
|         91|   8900| 24.59|        1|
|         70|   3959| 68.68|        1|
|         85|   1733| 28.53|        1|
|         53|   9900| 83.55|        1|
|         14|   1505|  4.32|        1|
+-----------+-------+------+---------+
only showing top 10 rows



In [29]:
customer_new_df.withColumn("withinCountry", func.expr("customer_id = item_id")).show(2)

+-----------+-------+------+-------------+
|customer_id|item_id|Amount|withinCountry|
+-----------+-------+------+-------------+
|         44|   8602| 37.19|        false|
|         35|   5368| 65.89|        false|
+-----------+-------+------+-------------+
only showing top 2 rows



# Renaming columns

In [30]:
customer_new_df.columns

['customer_id', 'item_id', 'Amount']

In [31]:
renamed_df = customer_new_df.withColumnRenamed("customer_id" , "Customer_ID")

In [32]:
renamed_df.columns

['Customer_ID', 'item_id', 'Amount']

# Changing a Column’s Type

In [33]:
renamed_df.withColumn("count" , func.expr("100").cast("float").alias("constant")).show(5)

+-----------+-------+------+-----+
|Customer_ID|item_id|Amount|count|
+-----------+-------+------+-----+
|         44|   8602| 37.19|100.0|
|         35|   5368| 65.89|100.0|
|          2|   3391| 40.64|100.0|
|         47|   6694| 14.98|100.0|
|         29|    680| 13.08|100.0|
+-----------+-------+------+-----+
only showing top 5 rows



# Filtering Rows

In [34]:
customer_new_df.filter(func.col("amount") > 50).show()

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         35|   5368| 65.89|
|         70|   3959| 68.68|
|         53|   9900| 83.55|
|         42|   6926| 57.77|
|          2|   4424| 55.77|
|         15|   6148| 65.53|
|         44|   8331| 99.19|
|          5|   3505| 64.18|
|         36|   8274| 88.64|
|         57|    963| 57.91|
|         12|   4396| 72.62|
|         22|   7161| 56.06|
|          0|   3479| 97.22|
|         88|   1272|  80.7|
|         86|   9254|  71.9|
|         40|   3083| 72.95|
|         98|     30| 86.56|
|         51|   2187| 84.57|
|         91|   8363| 64.42|
|         14|   5388| 77.77|
+-----------+-------+------+
only showing top 20 rows



In [35]:
customer_new_df.where(func.col("amount") > 50).where(func.col("customer_id") !=10 ).show()

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         35|   5368| 65.89|
|         70|   3959| 68.68|
|         53|   9900| 83.55|
|         42|   6926| 57.77|
|          2|   4424| 55.77|
|         15|   6148| 65.53|
|         44|   8331| 99.19|
|          5|   3505| 64.18|
|         36|   8274| 88.64|
|         57|    963| 57.91|
|         12|   4396| 72.62|
|         22|   7161| 56.06|
|          0|   3479| 97.22|
|         88|   1272|  80.7|
|         86|   9254|  71.9|
|         40|   3083| 72.95|
|         98|     30| 86.56|
|         51|   2187| 84.57|
|         91|   8363| 64.42|
|         14|   5388| 77.77|
+-----------+-------+------+
only showing top 20 rows



# Getting Unique Rows

In [36]:
customer_new_df.select("customer_id").distinct().show()

+-----------+
|customer_id|
+-----------+
|         31|
|         85|
|         65|
|         53|
|         78|
|         34|
|         81|
|         28|
|         76|
|         27|
|         26|
|         44|
|         12|
|         91|
|         22|
|         93|
|         47|
|          1|
|         52|
|         13|
+-----------+
only showing top 20 rows



In [37]:
customer_new_df.select("customer_id").distinct().count()

100

# Random Samples and Random Splits

In [38]:
customer_new_df.count()

10000

In [39]:
seed = 25
fraction = 0.3
withreplacement = False
customer_new_df.sample(withreplacement, fraction , seed).count()

2975

In [40]:
dataframes = customer_new_df.randomSplit([0.8 , 0.2] , seed)
print(dataframes[0].count() , dataframes[1].count())

7967 2033


# Concatenating and Appending Rows (Union)

In [41]:
from pyspark.sql import Row

In [42]:
old_schema = customer_new_df.schema

In [43]:
newRows = [
    Row(17 , 60 , 100.0),
    Row(17 , 17 , 200.0)
]

parallelizedRows = spark.sparkContext.parallelize(newRows)
df_new = spark.createDataFrame(parallelizedRows, old_schema)

In [44]:
df = customer_new_df.union(df_new)

In [45]:
df.where(func.col("customer_id") == 17).where(func.col("item_id") ==17).show()

                                                                                

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         17|     17| 200.0|
+-----------+-------+------+



# Sorting Rows

In [46]:
df.sort("amount").show()

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         33|   9769|   0.0|
|         76|   5339|   0.0|
|         38|   7808|  0.01|
|         10|   4612|  0.01|
|         54|   7193|  0.02|
|         50|    269|  0.03|
|         11|     97|  0.05|
|         30|   7862|  0.09|
|         18|   4019|   0.1|
|          1|   5550|   0.1|
|         65|   6431|   0.1|
|         95|   2830|  0.11|
|         67|   3273|  0.12|
|         98|   2656|  0.13|
|         70|   7644|  0.14|
|         10|   8064|  0.15|
|         52|   5358|  0.17|
|         66|   5900|  0.17|
|         79|   9412|  0.18|
|         30|   1726|  0.19|
+-----------+-------+------+
only showing top 20 rows



In [47]:
df.orderBy(func.col("Amount").desc()).show(5)

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         17|     17| 200.0|
|         17|     60| 100.0|
|         73|   7875| 99.99|
|         73|   7152| 99.99|
|         33|   2844| 99.97|
+-----------+-------+------+
only showing top 5 rows



# Limit

In [48]:
df.limit(2).show()

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         44|   8602| 37.19|
|         35|   5368| 65.89|
+-----------+-------+------+



In [49]:
df.sort(func.col("Amount").desc()).limit(2).show()

+-----------+-------+------+
|customer_id|item_id|Amount|
+-----------+-------+------+
|         17|     17| 200.0|
|         17|     60| 100.0|
+-----------+-------+------+



# Repartition and Coalesce

In [50]:
df.rdd.getNumPartitions()

9

In [51]:
df.repartition(5)

DataFrame[customer_id: int, item_id: int, Amount: float]

In [52]:
df.repartition(5, func.col("Amount")).coalesce(2)

DataFrame[customer_id: int, item_id: int, Amount: float]

In [53]:
friends = spark.read.format("csv")\
.option("inferSchema", "true")\
.load("fakefriends.csv")
friends.printSchema()
friends.createOrReplaceTempView("dfTable")

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)



In [54]:
friends.show(5)

+---+--------+---+---+
|_c0|     _c1|_c2|_c3|
+---+--------+---+---+
|  0|    Will| 33|385|
|  1|Jean-Luc| 26|  2|
|  2|    Hugh| 55|221|
|  3|  Deanna| 40|465|
|  4|   Quark| 68| 21|
+---+--------+---+---+
only showing top 5 rows



In [55]:
friends.where(func.col("_c2") != 33).select("*").show(5)

+---+--------+---+---+
|_c0|     _c1|_c2|_c3|
+---+--------+---+---+
|  1|Jean-Luc| 26|  2|
|  2|    Hugh| 55|221|
|  3|  Deanna| 40|465|
|  4|   Quark| 68| 21|
|  5|  Weyoun| 59|318|
+---+--------+---+---+
only showing top 5 rows



In [56]:
age_condition = func.col("_c2") > 33
friends_condition = func.col("_c3") > 10

friends.where(age_condition & friends_condition).show(5)

+---+------+---+---+
|_c0|   _c1|_c2|_c3|
+---+------+---+---+
|  2|  Hugh| 55|221|
|  3|Deanna| 40|465|
|  4| Quark| 68| 21|
|  5|Weyoun| 59|318|
|  6|Gowron| 37|220|
+---+------+---+---+
only showing top 5 rows



# Numbers

In [57]:
friends.selectExpr("POWER(_C2,2) as Square_of_age").show(5)

+-------------+
|Square_of_age|
+-------------+
|       1089.0|
|        676.0|
|       3025.0|
|       1600.0|
|       4624.0|
+-------------+
only showing top 5 rows



In [58]:
friends.selectExpr("_c2" , "round(_c2/5 ,2) as ROUND").show(5)

+---+-----+
|_c2|ROUND|
+---+-----+
| 33|  6.6|
| 26|  5.2|
| 55| 11.0|
| 40|  8.0|
| 68| 13.6|
+---+-----+
only showing top 5 rows



In [59]:
friends.select(func.corr("_c2" , "_c3")).show()

+--------------------+
|      corr(_c2, _c3)|
+--------------------+
|0.027639181401853022|
+--------------------+



In [60]:
friends.describe().show()

[Stage 68:>                                                         (0 + 1) / 1]

+-------+-----------------+----+------------------+-----------------+
|summary|              _c0| _c1|               _c2|              _c3|
+-------+-----------------+----+------------------+-----------------+
|  count|              500| 500|               500|              500|
|   mean|            249.5|null|            43.708|          248.532|
| stddev|144.4818327679989|null|14.864340996711995|147.2217288680643|
|    min|                0| Ben|                18|                1|
|    max|              499|Worf|                69|              499|
+-------+-----------------+----+------------------+-----------------+



                                                                                

In [61]:
colName = "Amount"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile(colName, quantileProbs, relError)

[49.38999938964844]

In [62]:
friends.columns

['_c0', '_c1', '_c2', '_c3']

In [63]:
friends.show(2)

+---+--------+---+---+
|_c0|     _c1|_c2|_c3|
+---+--------+---+---+
|  0|    Will| 33|385|
|  1|Jean-Luc| 26|  2|
+---+--------+---+---+
only showing top 2 rows



In [64]:
friends = friends.withColumnRenamed("_c1" , "Name").withColumnRenamed("_c2" , "Age").withColumnRenamed("_c3" , "NoFriends")

In [65]:
friends.select(func.initcap("Name")).show(5)

+-------------+
|initcap(Name)|
+-------------+
|         Will|
|     Jean-luc|
|         Hugh|
|       Deanna|
|        Quark|
+-------------+
only showing top 5 rows



# Strings

In [66]:
friends.select(func.lower(func.col("Name")) , func.upper(func.col("Name"))).show(5)

+-----------+-----------+
|lower(Name)|upper(Name)|
+-----------+-----------+
|       will|       WILL|
|   jean-luc|   JEAN-LUC|
|       hugh|       HUGH|
|     deanna|     DEANNA|
|      quark|      QUARK|
+-----------+-----------+
only showing top 5 rows



In [67]:
friends.select(
func.ltrim(lit(" HELLO ")).alias("ltrim"),
func.rtrim(lit(" HELLO ")).alias("rtrim"),
func.trim(lit(" HELLO ")).alias("trim"),
func.lpad(lit("HELLO"), 3, " ").alias("lp"),
func.rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)


+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 2 rows



In [68]:
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
friends.select(
regexp_replace(func.col("Name"), regex_string, "COLOR").alias("color_Name"),
func.col("Name")).show(2)

+----------+--------+
|color_Name|    Name|
+----------+--------+
|      Will|    Will|
|  Jean-Luc|Jean-Luc|
+----------+--------+
only showing top 2 rows



In [69]:
from pyspark.sql.functions import translate
friends.select(translate(func.col("Name"), "1375", "leet"),func.col("Name"))\
.show(2)

+---------------------------+--------+
|translate(Name, 1375, leet)|    Name|
+---------------------------+--------+
|                       Will|    Will|
|                   Jean-Luc|Jean-Luc|
+---------------------------+--------+
only showing top 2 rows



# Date Time

In [70]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")

dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [71]:
dateDF.show()

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2022-09-01|2022-09-01 10:32:...|
|  1|2022-09-01|2022-09-01 10:32:...|
|  2|2022-09-01|2022-09-01 10:32:...|
|  3|2022-09-01|2022-09-01 10:32:...|
|  4|2022-09-01|2022-09-01 10:32:...|
|  5|2022-09-01|2022-09-01 10:32:...|
|  6|2022-09-01|2022-09-01 10:32:...|
|  7|2022-09-01|2022-09-01 10:32:...|
|  8|2022-09-01|2022-09-01 10:32:...|
|  9|2022-09-01|2022-09-01 10:32:...|
+---+----------+--------------------+



In [72]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(func.col("today"), 5), date_add(func.col("today"), 5)).show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2022-08-27|        2022-09-06|
+------------------+------------------+
only showing top 1 row



In [73]:
dateDF.withColumn("week_ago", date_sub(func.col("today"), 7))\
.select(func.datediff(func.col("week_ago"), func.col("today"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [74]:
dateDF.select(
func.to_date(lit("2016-01-01")).alias("start"),
func.to_date(lit("2016-01-02")).alias("end"))\
.select(func.months_between(func.col("start"), func.col("end"))).show(1)


+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                     -0.03225806|
+--------------------------------+
only showing top 1 row



In [75]:
friends.na.drop("all")

DataFrame[_c0: int, Name: string, Age: int, NoFriends: int]

In [76]:
friends.show(5)

+---+--------+---+---------+
|_c0|    Name|Age|NoFriends|
+---+--------+---+---------+
|  0|    Will| 33|      385|
|  1|Jean-Luc| 26|        2|
|  2|    Hugh| 55|      221|
|  3|  Deanna| 40|      465|
|  4|   Quark| 68|       21|
+---+--------+---+---------+
only showing top 5 rows



In [77]:
friends.select(func.split("Name"," ")).show(5)

+------------------+
|split(Name,  , -1)|
+------------------+
|            [Will]|
|        [Jean-Luc]|
|            [Hugh]|
|          [Deanna]|
|           [Quark]|
+------------------+
only showing top 5 rows



In [78]:
friends.select(func.size(func.split("Name"," "))).show(5)

+------------------------+
|size(split(Name,  , -1))|
+------------------------+
|                       1|
|                       1|
|                       1|
|                       1|
|                       1|
+------------------------+
only showing top 5 rows



In [79]:
from pyspark.sql.functions import create_map
friends.select(create_map(func.col("Name"), func.col("Age")).alias("complex_map"))\
.show(2)


+----------------+
|     complex_map|
+----------------+
|    {Will -> 33}|
|{Jean-Luc -> 26}|
+----------------+
only showing top 2 rows



In [80]:
friends.select(create_map(func.col("Name"), func.col("Age")).alias("complex_map"))\
.selectExpr("complex_map['Will']").show(10)


+-----------------+
|complex_map[Will]|
+-----------------+
|               33|
|             null|
|             null|
|             null|
|             null|
|             null|
|             null|
|               54|
|             null|
|             null|
+-----------------+
only showing top 10 rows



# User Defined function

In [81]:
def double_friends(x):
    return x*2

from pyspark.sql.functions import udf
double_friendsUDF = udf(double_friends)


In [82]:
from pyspark.sql.functions import col
friends.select(double_friendsUDF(col("nofriends"))).show(2)


[Stage 91:>                                                         (0 + 1) / 1]

+-------------------------+
|double_friends(nofriends)|
+-------------------------+
|                      770|
|                        4|
+-------------------------+
only showing top 2 rows



                                                                                

# Count

In [83]:
friends.select(func.count("Name")).show()

+-----------+
|count(Name)|
+-----------+
|        500|
+-----------+



In [84]:
friends.select(func.countDistinct("Name")).show()

+--------------------+
|count(DISTINCT Name)|
+--------------------+
|                  30|
+--------------------+



In [85]:
friends.select(func.approx_count_distinct("Name" , 0.1)).show() # here the 0.1 is the max error 

+---------------------------+
|approx_count_distinct(Name)|
+---------------------------+
|                         25|
+---------------------------+



# first and last

In [86]:
friends.select(func.first("Name"),func.last("Name")).show()

+-----------+----------+
|first(Name)|last(Name)|
+-----------+----------+
|       Will|     Leeta|
+-----------+----------+



# min and max

In [87]:
friends.select(func.max("NoFriends"),func.min("NoFriends")).show()

+--------------+--------------+
|max(NoFriends)|min(NoFriends)|
+--------------+--------------+
|           499|             1|
+--------------+--------------+



# Sum

In [88]:
friends.select(func.sum("Nofriends")).show()

+--------------+
|sum(Nofriends)|
+--------------+
|        124266|
+--------------+



In [89]:
friends.select(func.sumDistinct("Age")).show()



+-----------------+
|sum(DISTINCT Age)|
+-----------------+
|             2262|
+-----------------+



# Avg

In [90]:
friends.select(func.count("Age") , func.sum("Age") , func.avg("Age") , func.expr("mean(Age)")).show()

+----------+--------+--------+---------+
|count(Age)|sum(Age)|avg(Age)|mean(Age)|
+----------+--------+--------+---------+
|       500|   21854|  43.708|   43.708|
+----------+--------+--------+---------+



# Variance and Standard Deviation

In [91]:
friends.select(func.var_pop("Age"), func.var_samp("Age"),
func.stddev_pop("Age"), func.stddev_samp("Age")).show()

+------------------+------------------+-----------------+------------------+
|      var_pop(Age)|     var_samp(Age)|  stddev_pop(Age)|  stddev_samp(Age)|
+------------------+------------------+-----------------+------------------+
|220.50673599999988|220.94863326653297|14.84946921610331|14.864340996711995|
+------------------+------------------+-----------------+------------------+



# Covariance and Correlation

In [92]:
friends.select(func.corr("Age" , "Nofriends"), func.covar_samp("Age" , "Nofriends"),
func.covar_pop("Age" , "Nofriends")).show()


+--------------------+--------------------------+-------------------------+
|corr(Age, Nofriends)|covar_samp(Age, Nofriends)|covar_pop(Age, Nofriends)|
+--------------------+--------------------------+-------------------------+
|0.027639181401853022|        60.484312625250496|                60.363344|
+--------------------+--------------------------+-------------------------+



# Grouping

In [93]:
friends.groupBy("Age").count().show()

+---+-----+
|Age|count|
+---+-----+
| 31|    8|
| 65|    5|
| 53|    7|
| 34|    6|
| 28|   10|
| 26|   17|
| 27|    8|
| 44|   12|
| 22|    7|
| 47|    9|
| 52|   11|
| 40|   17|
| 20|    5|
| 57|   12|
| 54|   13|
| 48|   10|
| 19|   11|
| 64|   12|
| 41|    9|
| 43|    7|
+---+-----+
only showing top 20 rows



In [94]:
friends.groupBy("Age").agg(func.count("Nofriends").alias("func_count") , func.expr("count(Nofriends)")).show()

+---+----------+----------------+
|Age|func_count|count(Nofriends)|
+---+----------+----------------+
| 31|         8|               8|
| 65|         5|               5|
| 53|         7|               7|
| 34|         6|               6|
| 28|        10|              10|
| 26|        17|              17|
| 27|         8|               8|
| 44|        12|              12|
| 22|         7|               7|
| 47|         9|               9|
| 52|        11|              11|
| 40|        17|              17|
| 20|         5|               5|
| 57|        12|              12|
| 54|        13|              13|
| 48|        10|              10|
| 19|        11|              11|
| 64|        12|              12|
| 41|         9|               9|
| 43|         7|               7|
+---+----------+----------------+
only showing top 20 rows



# Window

In [95]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("Age")

In [96]:
maxFriends = func.max(func.col("Nofriends")).over(windowSpec)

# Joins

In [97]:
schema = StructType([StructField('id', IntegerType(), True), \
    StructField('name', StringType(), True), StructField('graduate_program', IntegerType(), True), \
        StructField('spark_status',IntegerType())])

schema_2 = StructType([StructField('id', IntegerType(), True), \
    StructField('degree', StringType(), True), StructField('department', StringType(), True), \
        StructField('school',StringType())])

In [98]:
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

In [99]:
person.show()
graduateProgram.show()
sparkStatus.show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+

+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



## Inner join 

In [100]:
join_expression = person["id"] == graduateProgram["id"]
person.join(graduateProgram, join_expression).show()


+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  2|Masters|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [101]:
joinType = "inner"
person.join(graduateProgram, join_expression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  2|Masters|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Outer Joins

In [102]:
joinType = "outer"
person.join(graduateProgram, join_expression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  2|Masters|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Outer Join

In [103]:
row = Row(5,"shashank",0,[500])
temp = spark.createDataFrame([row])
person = person.union(temp)
person.show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
|  5|        shashank|               0|          [500]|
+---+----------------+----------------+---------------+



In [104]:
joinType = "left_outer"
person.join(graduateProgram, join_expression, joinType).show()

+---+----------------+----------------+---------------+----+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|  id| degree|          department|     school|
+---+----------------+----------------+---------------+----+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|   0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|   1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|   2|Masters|                EECS|UC Berkeley|
|  5|        shashank|               0|          [500]|null|   null|                null|       null|
+---+----------------+----------------+---------------+----+-------+--------------------+-----------+



## Right Outer Joins

In [105]:
row = (6,"Master","Data Science","UNT")
temp = spark.createDataFrame([row])
graduateProgram = graduateProgram.union(temp)
graduateProgram.show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
|  6| Master|        Data Science|        UNT|
+---+-------+--------------------+-----------+



In [106]:
joinType = "right_outer"
person.join(graduateProgram, join_expression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  2|Masters|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  6| Master|        Data Science|        UNT|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Semi

In [107]:
joinType = "left_semi"
person.join(graduateProgram, join_expression, joinType).show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



## Left Anti

In [108]:
joinType = "left_anti"
person.join(graduateProgram, join_expression, joinType).show()

+---+--------+----------------+------------+
| id|    name|graduate_program|spark_status|
+---+--------+----------------+------------+
|  5|shashank|               0|       [500]|
+---+--------+----------------+------------+



## Cross Join

In [109]:
person.crossJoin(graduateProgram).show()



+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  2|Masters|                EECS|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  6| Master|        Data Science|        UNT|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  2|Masters|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC 

                                                                                

# Datasources

# Basics of Reading Data

In [116]:
temp_df = spark.read.format("csv").option("inferSchema", "true") \
.option("path", "1800.csv") \
.load()

temp_df = temp_df.withColumnRenamed("_c0" , "satation").withColumnRenamed("_c1" , "date").withColumnRenamed("_c2" , "Temp").withColumnRenamed("_c3","Temp_value")


In [117]:
temp_df.show(5)

+-----------+--------+----+----------+----+----+---+----+
|   satation|    date|Temp|Temp_value| _c4| _c5|_c6| _c7|
+-----------+--------+----+----------+----+----+---+----+
|ITE00100554|18000101|TMAX|       -75|null|null|  E|null|
|ITE00100554|18000101|TMIN|      -148|null|null|  E|null|
|GM000010962|18000101|PRCP|         0|null|null|  E|null|
|EZE00100082|18000101|TMAX|       -86|null|null|  E|null|
|EZE00100082|18000101|TMIN|      -135|null|null|  E|null|
+-----------+--------+----+----------+----+----+---+----+
only showing top 5 rows



In [123]:
temp_df = temp_df.select(func.col("satation"),func.col("date"),func.col("Temp") , func.col("Temp_value"))

In [124]:
temp_df.show(5)


+-----------+--------+----+----------+
|   satation|    date|Temp|Temp_value|
+-----------+--------+----+----------+
|ITE00100554|18000101|TMAX|       -75|
|ITE00100554|18000101|TMIN|      -148|
|GM000010962|18000101|PRCP|         0|
|EZE00100082|18000101|TMAX|       -86|
|EZE00100082|18000101|TMIN|      -135|
+-----------+--------+----+----------+
only showing top 5 rows



In [127]:
temp_df.write.format("csv") \
.option("mode", "OVERWRITE") \
.option("dateFormat", "yyyy-MM-dd") \
.option("path", "temp.csv") \
.save()

22/09/01 11:37:32 WARN ParseMode: OVERWRITE is not a valid parse mode. Using PERMISSIVE.


# CSV read and write

In [133]:
csvFile = spark.read.format("csv")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("customer-orders.csv")

In [134]:
csvFile.show(5)

+---+----+-----+
|_c0| _c1|  _c2|
+---+----+-----+
| 44|8602|37.19|
| 35|5368|65.89|
|  2|3391|40.64|
| 47|6694|14.98|
| 29| 680|13.08|
+---+----+-----+
only showing top 5 rows



In [136]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("my-tsv-file.tsv")

# Json Read and write

In [137]:
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("2010.json").show(5)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [138]:
csvFile.write.format("json").mode("overwrite").save("my-json-file.json")


# Parquet Files

In [139]:
spark.read.format("parquet")\
.load("file.parquet").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [140]:
csvFile.write.format("parquet").mode("overwrite")\
.save("my-parquet-file.parquet")


# ORC file

In [141]:
spark.read.format("orc").load("orc_file.orc").show(5)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [142]:
csvFile.write.format("orc").mode("overwrite").save("my-json-file.orc")

# Spark SQL

In [144]:
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [145]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |datetable|       true|
|         |  dftable|       true|
+---------+---------+-----------+



In [146]:
temp_df = spark.read.format("csv").option("inferSchema", "true") \
.option("path", "1800.csv") \
.load()

temp_df = temp_df.withColumnRenamed("_c0" , "satation").withColumnRenamed("_c1" , "date").withColumnRenamed("_c2" , "Temp").withColumnRenamed("_c3","Temp_value")


In [151]:
temp_df.createOrReplaceTempView("temp_table")

In [154]:
spark.sql("select * from temp_table").show(5)

+-----------+--------+----+----------+----+----+---+----+
|   satation|    date|Temp|Temp_value| _c4| _c5|_c6| _c7|
+-----------+--------+----+----------+----+----+---+----+
|ITE00100554|18000101|TMAX|       -75|null|null|  E|null|
|ITE00100554|18000101|TMIN|      -148|null|null|  E|null|
|GM000010962|18000101|PRCP|         0|null|null|  E|null|
|EZE00100082|18000101|TMAX|       -86|null|null|  E|null|
|EZE00100082|18000101|TMIN|      -135|null|null|  E|null|
+-----------+--------+----+----------+----+----+---+----+
only showing top 5 rows



In [171]:
spark.sql("drop table temp")

DataFrame[]

In [172]:
spark.sql("CREATE TABLE temp (Satation STRING, date STRING, temp STRING , temp_value LONG) using csv options (path '1800.csv')")

DataFrame[]

In [210]:
spark.sql("show tables").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|    flights|      false|
|  default|nested_data|      false|
|  default|       temp|      false|
|         |  datetable|       true|
|         |    dftable|       true|
|         | temp_table|       true|
+---------+-----------+-----------+



In [174]:
spark.sql("INSERT INTO temp VALUES ('ITE00100554','18000101' , 'TMAX' , -75)")

22/09/01 15:00:20 WARN HadoopFSUtils: The directory file:/home/shashank1/cloud_training/spark/spark-warehouse/1800.csv was not found. Was it deleted very recently?


DataFrame[]

In [214]:
temp_df.columns

['satation', 'date', 'Temp', 'Temp_value', '_c4', '_c5', '_c6', '_c7']

In [215]:
spark.sql("INSERT INTO temp \
               select satation,date,Temp,Temp_value from temp_table")

DataFrame[]

In [216]:
spark.sql("select * from temp").show()

+-----------+--------+----+----------+
|   Satation|    date|temp|temp_value|
+-----------+--------+----+----------+
|ITE00100554|18000101|TMAX|       -75|
|ITE00100554|18000101|TMIN|      -148|
|GM000010962|18000101|PRCP|         0|
|EZE00100082|18000101|TMAX|       -86|
|EZE00100082|18000101|TMIN|      -135|
|ITE00100554|18000102|TMAX|       -60|
|ITE00100554|18000102|TMIN|      -125|
|GM000010962|18000102|PRCP|         0|
|EZE00100082|18000102|TMAX|       -44|
|EZE00100082|18000102|TMIN|      -130|
|ITE00100554|18000103|TMAX|       -23|
|ITE00100554|18000103|TMIN|       -46|
|GM000010962|18000103|PRCP|         4|
|EZE00100082|18000103|TMAX|       -10|
|EZE00100082|18000103|TMIN|       -73|
|ITE00100554|18000104|TMAX|         0|
|ITE00100554|18000104|TMIN|       -13|
|GM000010962|18000104|PRCP|         0|
|EZE00100082|18000104|TMAX|       -55|
|EZE00100082|18000104|TMIN|       -74|
+-----------+--------+----+----------+
only showing top 20 rows



In [180]:
spark.sql("CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights")

22/09/01 15:02:45 WARN HadoopFSUtils: The directory file:/home/shashank1/cloud_training/spark/spark-warehouse/2010.json was not found. Was it deleted very recently?


DataFrame[]

In [183]:
spark.sql("DROP TABLE flights_from_select")

DataFrame[]

In [184]:
spark.sql("DROP TABLE IF EXISTS flights_csv")

DataFrame[]

In [185]:
spark.sql("CACHE TABLE flights")

DataFrame[]

In [186]:
spark.sql("UNCACHE TABLE flights")

DataFrame[]

In [187]:
spark.sql("CREATE VIEW just_usa_view AS SELECT * FROM flights")

DataFrame[]

In [188]:
spark.sql("show tables").show()

+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|  default|      flights|      false|
|  default|just_usa_view|      false|
|  default|         temp|      false|
|         |    datetable|       true|
|         |      dftable|       true|
|         |   temp_table|       true|
+---------+-------------+-----------+



In [189]:
spark.sql("DROP VIEW IF EXISTS just_usa_view")

DataFrame[]

In [190]:
spark.sql("show tables").show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|   flights|      false|
|  default|      temp|      false|
|         | datetable|       true|
|         |   dftable|       true|
|         |temp_table|       true|
+---------+----------+-----------+



In [192]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [193]:
spark.sql("CREATE DATABASE some_db")

DataFrame[]

In [194]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|  some_db|
+---------+



In [195]:
spark.sql("USE some_db")

DataFrame[]

In [197]:
spark.sql("SHOW tables").show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         | datetable|       true|
|         |   dftable|       true|
|         |temp_table|       true|
+---------+----------+-----------+



In [199]:
spark.sql("SELECT * FROM default.flights").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
+-----------------+-------------------+-----+



In [201]:
spark.sql("SELECT current_database()").show()

+------------------+
|current_database()|
+------------------+
|           some_db|
+------------------+



In [202]:
spark.sql("USE default")

DataFrame[]

In [203]:
spark.sql("DROP DATABASE IF EXISTS some_db")

DataFrame[]

In [204]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [218]:
spark.sql("select * from temp").show(5)

+-----------+--------+----+----------+
|   Satation|    date|temp|temp_value|
+-----------+--------+----+----------+
|ITE00100554|18000101|TMAX|       -75|
|ITE00100554|18000101|TMIN|      -148|
|GM000010962|18000101|PRCP|         0|
|EZE00100082|18000101|TMAX|       -86|
|EZE00100082|18000101|TMIN|      -135|
+-----------+--------+----+----------+
only showing top 5 rows



In [221]:

spark.sql( " select case when temp = 'TMAX' THEN 1 when temp= 'TMIN' THEN 0 else -1 END as new_column from temp" ).show(5)

+----------+
|new_column|
+----------+
|         1|
|         0|
|        -1|
|         1|
|         0|
+----------+
only showing top 5 rows



In [205]:
spark.sql("CREATE VIEW IF NOT EXISTS nested_data AS SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights")

DataFrame[]

In [207]:
spark.sql("SELECT * FROM nested_data").show()

+-------+-----+
|country|count|
+-------+-----+
+-------+-----+



In [209]:
spark.sql("SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights").show()

+-----------------+--------------+
|DEST_COUNTRY_NAME|array(1, 2, 3)|
+-----------------+--------------+
+-----------------+--------------+



In [224]:
def square_temp(x):
    return x**2

spark.udf.register("square_temp", square_temp,IntegerType())

spark.sql("select square_temp(temp_value) from temp").show(10)

+-----------------------+
|square_temp(temp_value)|
+-----------------------+
|                   5625|
|                  21904|
|                      0|
|                   7396|
|                  18225|
|                   3600|
|                  15625|
|                      0|
|                   1936|
|                  16900|
+-----------------------+
only showing top 10 rows



In [225]:
spark.sql("select * from temp where temp_value = ( select min(temp_value) from temp)").show()

+-----------+--------+----+----------+
|   Satation|    date|temp|temp_value|
+-----------+--------+----+----------+
|ITE00100554|18000101|TMIN|      -148|
+-----------+--------+----+----------+

