In [0]:
%pyspark

schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
# Create our static data
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
]

# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)
# Show the DataFrame; it should reflect our table above
blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

In [1]:
%pyspark

from pyspark.sql.functions import col, sum

blogs_df\
.selectExpr("*" , "concat(first, ' ', last) full_name")\
.agg(sum(col("Hits")).alias("Hits"))\
.show()

In [2]:
%pyspark

from pyspark.sql.functions import expr

blogs_df\
.where("hits > 10000")\
.show()


In [3]:


sc.applicationId

In [4]:
%pyspark
import os
from pyspark.sql.types import *
    
SPARK_HOME = os.getenv('SPARK_HOME')

# Read data from json file
# link for this people.json (https://github.com/apache/spark/blob/master/examples/src/main/resources/people.json)
# Use hdfs path if you are using hdfs
df1 = spark.read.json("file://" + SPARK_HOME + "/examples/src/main/resources/people.json")
df1.printSchema()
df1.show()

# Read data from csv file. You can customize it via spark.read.options. E.g. In the following example, we customize the sep and header
df2 = spark.read.options(sep=";", header=True).csv("file://"  + SPARK_HOME + "/examples/src/main/resources/people.csv")
df2.printSchema()
df2.show()

# Specify schema for your csv file
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType().add("name", StringType(), True) \
    .add("age", IntegerType(), True) \
    .add("job", StringType(), True)
    
df3 = spark.read\
    .options(sep=";", header=True) \
    .schema(schema) \
    .csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv") 
    
df3.printSchema()
df3.show()



In [5]:
%pyspark

spark.conf.get("spark.sql.files.maxPartitionBytes")


In [6]:
spark.table("homework.bank").show

 

What were all the different types of fire calls in 2018?
What months within the year 2018 saw the highest number of fire calls?
Which neighborhood in San Francisco generated the most fire calls in 2018?
Which neighborhoods had the worst response times to fire calls in 2018?
Which week in the year in 2018 had the most fire calls?
Is there a correlation between neighborhood, zip code, and number of fire calls?
How can we use Parquet files or SQL tables to store this data and read it back?


In [8]:
%pyspark

fireDF = spark.read.option("header", True).csv("/user/admin/sf-fire-calls.csv")
fireDF.show(100, False)


In [9]:
%pyspark

from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import year, month, countDistinct


fireDF\
.coalesce(1)\
.withColumn("CallDate", to_timestamp(fireDF.CallDate, 'dd/MM/yyyy'))\
.withColumn("CallYear", year("CallDate"))\
.where("CallYear = '2018'") \
.select("CallType")\
.distinct()\
.show(1000, False)

In [10]:
%pyspark
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import year, month, countDistinct

fireDF\
.withColumn("CallDate", to_timestamp(fireDF.CallDate, 'dd/MM/yyyy'))\
.withColumn("CallMonth", month("CallDate"))\
.withColumn("CallYear", year("CallDate"))\
.where("CallYear = '2018'") \
.groupBy("CallMonth")\
.agg(
    countDistinct("callNumber").alias("call_count")
    )\
.orderBy("call_count", ascending=False)\
.show()

In [11]:
%pyspark

(fireDF
.groupBy("Neighborhood")
.agg(countDistinct("callNumber").alias("call_count"))
.orderBy("call_count", ascending=False)
.show(1000, False))

In [12]:
%pyspark

fireDF\
.where("lower(CallType) like '%fire%'")\
.select("Neighborhood", "delay", "callType")\
.orderBy("delay", ascending=False)\
.show()

In [13]:
%pyspark

from pyspark.sql.functions import weekofyear, year


fireDF\
.where("lower(CallType) like '%fire%'")\
.withColumn("CallDate", to_timestamp(fireDF.CallDate, 'dd/MM/yyyy'))\
.withColumn("CallWeek", weekofyear("CallDate"))\
.withColumn("CallYear", year("CallDate"))\
.where("CallYear = '2018'") \
.groupBy("CallYear", "CallWeek")\
.agg(
    countDistinct("callNumber").alias("call_count")
    )\
.orderBy("call_count", ascending=False)\
.show()


In [15]:
%pyspark
print(sc.applicationId)

In [16]:
%pyspark

print(spark.conf.get("spark.sql.shuffle.partitions"))

spark.conf.set("spark.sql.shuffle.partitions", 20)

print(spark.conf.get("spark.sql.shuffle.partitions"))

In [17]:
spark.table("homework.bank")

In [18]:
%pyspark

print("Hello " + z.textbox("name"))

In [20]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \
            .toDF("id", "name", "age", "country")

# Create udf create python lambda
from pyspark.sql.functions import udf
udf1 = udf(lambda e: e.upper())
df2 = df1.select(udf1(df1["name"]))
df2.show()

# UDF could also be used in filter, in this case the return type must be Boolean
# We can also use annotation to create udf
from pyspark.sql.types import *
@udf(returnType=BooleanType())
def udf2(e):
    if e >= 20:
        return True;
    else:
        return False

df3 = df1.filter(udf2(df1["age"]))
df3.show()

# UDF could also accept more than 1 argument.
udf3 = udf(lambda e1, e2: e1 + "_" + e2)
df4 = df1.select(udf3(df1["name"], df1["country"]).alias("name_country"))
df4.show()


In [21]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \
           .toDF("id", "name", "age", "country")

# You can call agg function after groupBy directly, such as count/min/max/avg/sum
df2 = df1.groupBy("country").count()
df2.show()

# Pass a Map if you want to do multiple aggregation
df3 = df1.groupBy("country").agg({"age": "avg", "id": "count"})
df3.show()

import pyspark.sql.functions as F
# Or you can pass a list of agg function
df4 = df1.groupBy("country").agg(F.avg(df1["age"]).alias("avg_age"), F.count(df1["id"]).alias("count"))
df4.show()

# You can not pass Map if you want to do multiple aggregation on the same column as the key of Map should be unique. So in this case
# you have to pass a list of agg functions
df5 = df1.groupBy("country").agg(F.avg(df1["age"]).alias("avg_age"), F.max(df1["age"]).alias("max_age"))
df5.show()








In [23]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, 1), (2, "jeff", 23, 2), (3, "james", 18, 3)]).toDF("id", "name", "age", "c_id")
df1.show()

df2 = spark.createDataFrame([(1, "USA"), (2, "China")]).toDF("c_id", "c_name")
df2.show()

# You can just specify the key name if join on the same key
df3 = df1.join(df2, "c_id")
df3.show()

# Or you can specify the join condition expclitly in case the key is different between tables
df4 = df1.join(df2, df1["c_id"] == df2["c_id"])
df4.show()

# You can specify the join type afte the join condition, by default it is inner join
df5 = df1.join(df2, df1["c_id"] == df2["c_id"], "left_outer")
df5.show()

In [24]:
%pyspark

df1 = spark.createDataFrame([("andy", 20, 1, 1), ("jeff", 23, 1, 2), ("james", 12, 2, 2)]).toDF("name", "age", "key_1", "key_2")
df1.show()

df2 = spark.createDataFrame([(1, 1, "USA"), (2, 2, "China")]).toDF("key_1", "key_2", "country")
df2.show()

# Join on 2 fields: key_1, key_2

# You can pass a list of field name if the join field names are the same in both tables
df3 = df1.join(df2, ["key_1", "key_2"])
df3.show()

# Or you can specify the join condition expclitly in case when the join fields name is differetnt in the two tables
df4 = df1.join(df2, (df1["key_1"] == df2["key_1"]) & (df1["key_2"] == df2["key_2"]))
df4.show()



In [25]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \
           .toDF("id", "name", "age", "country")
# call createOrReplaceTempView first if you want to query this DataFrame via sql
df1.createOrReplaceTempView("people")
# SparkSession.sql return DataFrame
df2 = spark.sql("select name, age from people")
df2.show()

# You need to register udf if you want to use it in sql
spark.udf.register("udf1", lambda e : e.upper())
df3 = spark.sql("select udf1(name), age from people")
df3.show()

In [26]:
%pyspark

df = spark.range(1 * 10000000).toDF("id").withColumn("square", col("id") * col("id"))

df.cache() # Cache the data

df.count() # Materialize the cache

In [27]:
%pyspark

df.count()

In [28]:
%pyspark

df.persist()

In [30]:
%spark

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
// Generate some sample data for two data sets

var states = scala.collection.mutable.Map[Int, String]()
var items = scala.collection.mutable.Map[Int, String]()
val rnd = new scala.util.Random(42)

// Initialize states and items purchased

states += (0 -> "AZ", 1 -> "CO", 2-> "CA", 3-> "TX", 4 -> "NY", 5-> "MI")
items += (0 -> "SKU-0", 1 -> "SKU-1", 2-> "SKU-2", 3-> "SKU-3", 4 -> "SKU-4",
5-> "SKU-5")
// Create DataFrames
val usersDF = (0 to 100000)
    .map(id => (id, s"user_${id}", s"user_${id}@databricks.com", states(rnd.nextInt(5))))
    .toDF("uid", "login", "email", "user_state")


val ordersDF = (0 to 100000)
    .map(r => (r, r, rnd.nextInt(10000), 10 * r* 0.2d, states(rnd.nextInt(5)), items(rnd.nextInt(5))))
    .toDF("transaction_id", "quantity", "users_id", "amount", "state", "items")
    
    
// Do the join
val usersOrdersDF = ordersDF
    .join(broadcast(usersDF), $"users_id" === $"uid", "left")
    .select("users_id", "transaction_id")
    
// Show the joined results
usersOrdersDF.show(false)

In [31]:
usersOrdersDF.explain()

In [32]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode

// Save as managed tables by bucketing them in Parquet format
usersDF.orderBy(asc("uid"))
    .write.format("parquet")
    .bucketBy(8, "uid")
    .mode("overwrite")
    .saveAsTable("UsersTbl")

ordersDF.orderBy(asc("users_id"))
    .write.format("parquet")
    .bucketBy(8, "users_id")
    .mode("overwrite")
    .saveAsTable("OrdersTbl")

// Cache the tables
spark.sql("CACHE TABLE UsersTbl")
spark.sql("CACHE TABLE OrdersTbl")
// Read them back in

val usersBucketDF = spark.table("UsersTbl")
val ordersBucketDF = spark.table("OrdersTbl")

// Do the join and show the results
val joinUsersOrdersBucketDF = ordersBucketDF
    .join(usersBucketDF, $"users_id" === $"uid")


joinUsersOrdersBucketDF.show(false)

In [33]:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
// Generate some sample data for two data sets

var city = scala.collection.mutable.Map[Int, String]()
val rnd = new scala.util.Random(42)

// Initialize states and items purchased

city += (0 -> "Moscow", 1 -> "Moscow", 2-> "Moscow", 3-> "Moscow", 4 -> "Moscow", 5 -> "Kazan")

for(i <- (1 to 100)) {
    (0 to 100000)
    .map(id => (id, city(rnd.nextInt(6))))
    .toDF("uid", "city")
    .withColumn("partition_id", lit(i))
    .write.mode("append")
    .saveAsTable("kotelnikov.uid_city")
}




In [34]:

spark.table("kotelnikov.uid_city")
.groupBy("city")
.agg(
    count("uid").as("uid_count")
    )
.show(100)