## Welcome to Zeppelin.
##### This is a live tutorial, you can run the code yourself. (Shift-Enter to Run)

### Заголовок

In [2]:
%sh

du -d 2


In [3]:
%sh

hdfs dfs -du -s -h /apps/spark/warehouse/* 

In [4]:
%pyspark

sc.applicationId

In [5]:
spark.sql("show databases").show

In [6]:
sc.stop
spark.stop

In [7]:
%pyspark

spark.sparkContext.applicationId

In [8]:
%pyspark

z.show(
    spark.sql("show databases")
)


In [9]:
%pyspark

print("Application ID: " + sc.applicationId + "\n")

print("Spark configuration: \n")

for pair in spark.sparkContext.getConf().getAll():
    print(pair[0] + " \t= " + pair[1]) 

In [10]:
%pyspark 

dept = [("Finance", 10), 
        ("Marketing", 20), 
        ("Sales", 30), 
        ("IT", 40) 
      ]

deptColumns = ["dept_name","dept_id"]

deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

deptDF.printSchema()

deptDF.show(truncate=False)

In [11]:
%pyspark

z.show(deptDF)

In [12]:
%pyspark

nmn_dataset_path = "/user/admin/mnm_dataset.csv"

In [13]:
%pyspark

bank_schema = "State STRING, Color STRING, Count INTEGER"

df = spark.read \
    .option("header", True) \
    .schema(bank_schema)\
    .csv(nmn_dataset_path)

print(df.count())

df.printSchema()

In [14]:
%pyspark

df.write.mode("overwrite")\
    .saveAsTable("default.mnm")


spark.table("default.mnm").printSchema()

spark.table("default.mnm").rdd.getNumPartitions()

In [15]:
%pyspark
spark.table("default.mnm") \
.show()

In [16]:
%pyspark
spark.sql("show tables in default").show()


In [17]:
%sh

hdfs dfs -ls /apps/spark/warehouse/*

In [18]:
%sh

hdfs dfs -ls /apps/spark/warehouse/*

In [19]:
spark.table("homework.bank").show

In [20]:
z.show(
    spark.sql("select * from mnm")
    )

In [21]:
%pyspark

df \
.repartition(10) \
.write.mode("overwrite")\
.saveAsTable("default.mnm")

In [22]:
%sh


hdfs dfs -ls /apps/spark/warehouse/mnm

In [23]:
%pyspark

spark.table("default.mnm").count()

In [24]:
%pyspark

spark.table("default.mnm").rdd.getNumPartitions()

In [25]:

import scala.io.Source._
import org.apache.spark.sql.{Dataset, SparkSession}

val url = "https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"
var res = fromURL(url).mkString.stripMargin.lines.toList
val csvData: Dataset[String] = spark.sparkContext.parallelize(res).toDS()

val bank = spark.read.option("header", true).option("delimiter", ";").option("inferSchema",true).csv(csvData)
bank.printSchema()

bank.write.mode("overwrite").saveAsTable("homework.bank")


spark.sql("refresh table homework.bank")
spark.table("homework.bank")
.show

In [26]:
spark.table("homework.bank")

In [27]:
%pyspark

print("Hello " + z.textbox("name"))

In [28]:
%pyspark

schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# Create our static data
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
]

# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)
# Show the DataFrame; it should reflect our table above
blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

In [29]:
%pyspark

spark.read.csv("/user/admin/sf-fire-calls.csv").show()

In [30]:
%pyspark

# create DataFrame from python list. It can infer schema for you.
df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")
df1.printSchema()
df1.show()

In [31]:
%pyspark
import os
from pyspark.sql.types import *
    
SPARK_HOME = os.getenv('SPARK_HOME')

# Read data from json file
# link for this people.json (https://github.com/apache/spark/blob/master/examples/src/main/resources/people.json)
# Use hdfs path if you are using hdfs
df1 = spark.read.json("file://" + SPARK_HOME + "/examples/src/main/resources/people.json")
df1.printSchema()
df1.show()

# Read data from csv file. You can customize it via spark.read.options. E.g. In the following example, we customize the sep and header
df2 = spark.read.options(sep=";", header=True).csv("file://"  + SPARK_HOME + "/examples/src/main/resources/people.csv")
df2.printSchema()
df2.show()

# Specify schema for your csv file
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType().add("name", StringType(), True) \
    .add("age", IntegerType(), True) \
    .add("job", StringType(), True)
    
df3 = spark.read.options(sep=";", header=True) \
    .schema(schema) \
    .csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv") 
df3.printSchema()
df3.show()



In [32]:
%pyspark

# withColumn could be used to add new Column
df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")

df2 = df1.withColumn("age2", df1["age"] + 1)
df2.show()

# the new column could replace the existing the column if the new column name is the same as the old column
df3 = df1.withColumn("age", df1["age"] + 1)
df3.show()

# Besides using expression to create new column, you could also use udf to create new column
# Use F.upper instead of upper, because the builtin udf of spark may conclifct with that of python, such as max
import pyspark.sql.functions as F
df4 = df1.withColumn("name", F.upper(df1["name"]))
df4.show()

In [33]:
%pyspark
df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")
# drop could be used to remove Column
df2 = df1.drop("id")
df2.show()


In [34]:
spark.table("homework.bank").show()

In [35]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")
# select can accept a list of string of the column names
df2 = df1.select("id", "name")
df2.show()

# select can also accept a list of Column. You can create column via $ or udf
import pyspark.sql.functions as F

df3 = df1.select(df1["id"], F.upper(df1["name"]), df1["age"] + 1)
df3.show()




In [36]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")

# filter accept a Column 
df2 = df1.filter(df1["age"] >= 20)
df2.show()

# To be noticed, you need to use "&" instead of "&&" or "AND" 
df3 = df1.filter((df1["age"] >= 20) & (df1["country"] == "China"))
df3.show()








In [40]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \
            .toDF("id", "name", "age", "country")

# Create udf create python lambda
from pyspark.sql.functions import udf
udf1 = udf(lambda e: e.upper())
df2 = df1.select(udf1(df1["name"]))
df2.show()

# UDF could also be used in filter, in this case the return type must be Boolean
# We can also use annotation to create udf
from pyspark.sql.types import *
@udf(returnType=BooleanType())
def udf2(e):
    if e >= 20:
        return True;
    else:
        return False

df3 = df1.filter(udf2(df1["age"]))
df3.show()

# UDF could also accept more than 1 argument.
udf3 = udf(lambda e1, e2: e1 + "_" + e2)
df4 = df1.select(udf3(df1["name"], df1["country"]).alias("name_country"))
df4.show()


In [41]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \
           .toDF("id", "name", "age", "country")

# You can call agg function after groupBy directly, such as count/min/max/avg/sum
df2 = df1.groupBy("country").count()
df2.show()

# Pass a Map if you want to do multiple aggregation
df3 = df1.groupBy("country").agg({"age": "avg", "id": "count"})
df3.show()

import pyspark.sql.functions as F
# Or you can pass a list of agg function
df4 = df1.groupBy("country").agg(F.avg(df1["age"]).alias("avg_age"), F.count(df1["id"]).alias("count"))
df4.show()

# You can not pass Map if you want to do multiple aggregation on the same column as the key of Map should be unique. So in this case
# you have to pass a list of agg functions
df5 = df1.groupBy("country").agg(F.avg(df1["age"]).alias("avg_age"), F.max(df1["age"]).alias("max_age"))
df5.show()








In [43]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, 1), (2, "jeff", 23, 2), (3, "james", 18, 3)]).toDF("id", "name", "age", "c_id")
df1.show()

df2 = spark.createDataFrame([(1, "USA"), (2, "China")]).toDF("c_id", "c_name")
df2.show()

# You can just specify the key name if join on the same key
df3 = df1.join(df2, "c_id")
df3.show()

# Or you can specify the join condition expclitly in case the key is different between tables
df4 = df1.join(df2, df1["c_id"] == df2["c_id"])
df4.show()

# You can specify the join type afte the join condition, by default it is inner join
df5 = df1.join(df2, df1["c_id"] == df2["c_id"], "left_outer")
df5.show()

In [44]:
%pyspark

df1 = spark.createDataFrame([("andy", 20, 1, 1), ("jeff", 23, 1, 2), ("james", 12, 2, 2)]).toDF("name", "age", "key_1", "key_2")
df1.show()

df2 = spark.createDataFrame([(1, 1, "USA"), (2, 2, "China")]).toDF("key_1", "key_2", "country")
df2.show()

# Join on 2 fields: key_1, key_2

# You can pass a list of field name if the join field names are the same in both tables
df3 = df1.join(df2, ["key_1", "key_2"])
df3.show()

# Or you can specify the join condition expclitly in case when the join fields name is differetnt in the two tables
df4 = df1.join(df2, (df1["key_1"] == df2["key_1"]) & (df1["key_2"] == df2["key_2"]))
df4.show()



In [45]:
%pyspark

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \
           .toDF("id", "name", "age", "country")
# call createOrReplaceTempView first if you want to query this DataFrame via sql
df1.createOrReplaceTempView("people")
# SparkSession.sql return DataFrame
df2 = spark.sql("select name, age from people")
df2.show()

# You need to register udf if you want to use it in sql
spark.udf.register("udf1", lambda e : e.upper())
df3 = spark.sql("select udf1(name), age from people")
df3.show()

In [48]:
%pyspark

schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
# Create our static data
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
    "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
    "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
    ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
    "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
    ["twitter", "LinkedIn"]]
]

# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)
# Show the DataFrame; it should reflect our table above
blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

In [49]:
%pyspark
from pyspark.sql.functions import *


# Read the file into a Spark DataFrame using the CSV
# format by inferring the schema and specifying that the
# file contains a header, which provides column names for comma-
# separated fields.
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(nmn_dataset_path))

# We use the DataFrame high-level APIs. Note
# that we don't use RDDs at all. Because some of Spark's
# functions return the same object, we can chain function calls.
# 1. Select from the DataFrame the fields "State", "Color", and "Count"
# 2. Since we want to group each state and its M&M color count,
# we use groupBy()
# 3. Aggregate counts of all colors and groupBy() State and Color
# 4 orderBy() in descending order
count_mnm_df = (mnm_df
    .select("State", "Color", "Count")
    .groupBy("State", "Color")
    .agg(count("Count").alias("Total"))
    .orderBy("Total", ascending=False))
    
    
# Show the resulting aggregations for all the states and colors;
# a total count of each color per state.
# Note show() is an action, which will trigger the above
# query to be executed.
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))