# !Getting started

## !!Starting Point: SparkSession

In [None]:
from pyspark.sql import SparkSession

#### 在spark2.0以后，SparkSession创建SQLContext，并封装了sparkContext,  config就是sparkconf

In [None]:
spark=SparkSession\
.builder.\
appName("python spark sql example")\
.config("spark.some.config.option","some-value")\
.getOrCreate()

## !!creating dataFrames

In [None]:
df=spark.read.json("/user/gongxf/spark/examples/src/main/resources/people.json")

In [None]:
# 会在后台生成parquet文件，但是sql没有对应的元数据
#df.write.format("parquet").mode("append").saveAsTable("people1")

In [None]:
df.show()

## !!Untyped Dataset Operations (aka DataFrame Operations)

In [None]:
df.printSchema()

In [None]:
df.select("name").show()

In [None]:
df.select(df['name'],df['age']+1).show()

In [None]:
df.filter(df['age']>21).show()

In [None]:
df.groupBy("age").count().show()

## !!running sql Queries prgrammatically

In [None]:
df.createOrReplaceTempView("people3")

In [None]:
sqlDF=spark.sql("select * from people3")

In [None]:
sqlDF.show()

## !!Global Temporary view

In [None]:
df.createGlobalTempView("people4")

In [None]:
spark.sql("select * from global_temp.people4").show()

In [None]:
spark.newSession().sql("select * from global_temp.people4").show()

## !! interoperating with RDDS

### !!! inferring the schema using reflection

In [None]:
from pyspark.sql import Row

In [None]:
sc=spark.sparkContext

In [None]:
lines=sc.textFile("/user/gongxf/spark/examples/src/main/resources/people.txt")

In [None]:
parts=lines.map(lambda l:l.split(","))

In [None]:
people=parts.map(lambda p:Row(name=p[0],age=int(p[1])))

In [None]:
schemaPeople=spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

In [None]:
teenageres=spark.sql("select name from people where age>=13 and age<=19")

In [None]:
teenageres.show()

In [None]:
teenNames=teenageres.rdd.map(lambda p:"name"+p.name).collect()

In [None]:
for name in teenNames:
    print(name)

### !!! programmatically specifying the schema

In [None]:
from pyspark.sql.types import *

In [None]:
sc=spark.sparkContext

In [None]:
lines=sc.textFile("/user/gongxf/spark/examples/src/main/resources/people.txt")

In [None]:
parts=lines.map(lambda l:l.split(","))
people=parts.map(lambda p:(p[0],p[1].strip()))

In [None]:
schemaString="name age"

In [None]:
fields=[StructField(field_name,StringType(),True) for field_name in schemaString.split()]

In [None]:
schema=StructType(fields)

In [None]:
schemaPeople=spark.createDataFrame(people,schema)

In [None]:
schemaPeople.createOrReplaceTempView("people")

In [None]:
results=spark.sql("select * from people")

In [None]:
results.show()

# !Data sorces

## !! Generic load/save functions

In [None]:
df=spark.read.load("/user/gongxf/spark/examples/src/main/resources/users.parquet")

In [None]:
df.show()

In [None]:
df.select("name","favorite_color").write.save("/user/gongxf/spark/examples/src/main/resources/namesAndFavColors1.parquet")

### !!!Manually Specifying Options

In [None]:
df=spark.read.load("/user/gongxf/spark/examples/src/main/resources/people.json"
                  ,format="json")

In [None]:
df.select("name","age").write.save("namesAndAgess1.parquet")

In [None]:
df=spark.read.load("/user/gongxf/spark/examples/src/main/resources/people.csv"
                  ,format='csv',sep=";",inferSchema='true',header="true")

In [None]:
df.show()

### !!!run sql on files directly

In [None]:
df=spark.sql("select * from parquet.`/user/gongxf/namesAndAgess.parquet`")

In [None]:
df.show()

### !!! save modes

### !!! Saving to Persistent tables

###  !!! bucketing,sorting and partitioning

In [None]:
df.show()

In [None]:
df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed1")

In [None]:
df=spark.read.parquet("/user/gongxf/spark/examples/src/main/resources/users.parquet")

In [None]:
df.write.partitionBy("favorite_color").bucketBy(42,"name")\
.saveAsTable("people_partitioned_bucketed")

## !!parquet files

### !!!loading data programmatically

In [None]:
peopleDF=spark.read.json("/user/gongxf/spark/examples/src/main/resources/people.json")

In [None]:
peopleDF.write.parquet("people11.parquet")

In [None]:
parquetFile=spark.read.parquet("people.parquet")

In [None]:
parquetFile.createOrReplaceTempView("parquetFile")

In [None]:
teenagers=spark.sql("select name from parquetFile where age>=12 and age<=19")

In [None]:
teenageres.show()

### ！！！schema meerging

In [None]:
from pyspark.sql import Row

In [None]:
sc=spark.sparkContext

In [None]:
squaresDF=spark.createDataFrame(sc.parallelize(range(1,6))\
            .map(lambda i:Row(single=i,double=i**2)))

In [None]:
squaresDF.show()

In [None]:
squaresDF.write.parquet("data/test_table1/key=1")

In [None]:
cubesDF=spark.createDataFrame(sc.parallelize(range(6,11))\
        .map(lambda i:Row(single=i,triple=i**3)))

In [None]:
cubesDF.write.parquet("data/test_table1/key=2")

In [None]:
mergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")

In [None]:
mergedDF.printSchema()

## Hive metastore Parquet table conversion

#### Metadata Refreshing

In [None]:
# spark.catalog.refreshTable("aa")

### !!!json Datasets

In [None]:
sc=spark.sparkContext

In [None]:
path="/user/gongxf/spark/examples/src/main/resources/people.json"

In [None]:
peopleDF=spark.read.json(path)

In [None]:
peopleDF.printSchema()

In [None]:
peopleDF.createOrReplaceTempView("people")

In [None]:
teenagerNamesDF=spark.sql("select name from people where age between 13 and 19")

In [None]:
teenagerNamesDF.show()

In [None]:
jsonStrings=['{"name":"Yin","address":{"city":"columbus","state":"ohio"}}']

In [None]:
otherPeopleRDD=sc.parallelize(jsonStrings)

In [None]:
otherPeople=spark.read.json(otherPeopleRDD)

In [None]:
otherPeople.show()

## !!hive table

In [1]:
from os.path import expanduser,join,abspath

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [3]:
warehouse_localtion=abspath('spark-warehouse')

In [4]:
spark=SparkSession\
.builder\
.appName("python spark")\
.config("spark.sql.warehouse.dir",warehouse_localtion)\
.enableHiveSupport()\
.getOrCreate()

In [5]:
spark.sql('create database if not exists kk')
spark.sql("use kk")

DataFrame[]

In [6]:
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

DataFrame[]

In [8]:
spark.sql("load data local inpath 'data/resources/kv1.txt' into table src")

DataFrame[]

In [9]:
spark.sql("select * from src").show()

+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
|409|val_409|
|255|val_255|
|278|val_278|
| 98| val_98|
|484|val_484|
|265|val_265|
|193|val_193|
|401|val_401|
|150|val_150|
|273|val_273|
|224|val_224|
|369|val_369|
| 66| val_66|
|128|val_128|
|213|val_213|
+---+-------+
only showing top 20 rows



In [10]:
spark.sql("select count(*) from src").show()

+--------+
|count(1)|
+--------+
|     500|
+--------+



In [11]:
sqlDF=spark.sql("select key,value from src where key<10 order by key")

In [12]:
sqlDF.show()

+---+-----+
|key|value|
+---+-----+
|  0|val_0|
|  0|val_0|
|  0|val_0|
|  2|val_2|
|  4|val_4|
|  5|val_5|
|  5|val_5|
|  5|val_5|
|  8|val_8|
|  9|val_9|
+---+-----+



In [13]:
stringsDS=sqlDF.rdd.map(lambda row:"key:%d,Value:%s" %(row.key,row.value))

In [14]:
for record in stringsDS.collect():
    print(record)

key:0,Value:val_0
key:0,Value:val_0
key:0,Value:val_0
key:2,Value:val_2
key:4,Value:val_4
key:5,Value:val_5
key:5,Value:val_5
key:5,Value:val_5
key:8,Value:val_8
key:9,Value:val_9


In [16]:
Record=Row("key","value")

In [18]:
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])

In [19]:
recordsDF.createOrReplaceTempView("records")

In [20]:
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

+---+------+---+------+
|key| value|key| value|
+---+------+---+------+
|  2| val_2|  2| val_2|
|  4| val_4|  4| val_4|
|  5| val_5|  5| val_5|
|  5| val_5|  5| val_5|
|  5| val_5|  5| val_5|
|  8| val_8|  8| val_8|
|  9| val_9|  9| val_9|
| 10|val_10| 10|val_10|
| 11|val_11| 11|val_11|
| 12|val_12| 12|val_12|
| 12|val_12| 12|val_12|
| 15|val_15| 15|val_15|
| 15|val_15| 15|val_15|
| 17|val_17| 17|val_17|
| 18|val_18| 18|val_18|
| 18|val_18| 18|val_18|
| 19|val_19| 19|val_19|
| 20|val_20| 20|val_20|
| 24|val_24| 24|val_24|
| 24|val_24| 24|val_24|
+---+------+---+------+
only showing top 20 rows



## !!PySpark Usage Guide for Pandas with Apache Arrow

### !!!Enabling for Conversion to/from Pandas

In [21]:
import numpy as np

In [22]:
import pandas as pd

In [23]:
spark.conf.set("spark.sql.execution.arrow.enabled","true")

In [24]:
pdf=pd.DataFrame(np.random.rand(100,3))

In [25]:
pdf.head(10)

Unnamed: 0,0,1,2
0,0.610143,0.062348,0.552398
1,0.072182,0.948571,0.419051
2,0.825324,0.875623,0.812464
3,0.818682,0.221526,0.024691
4,0.90769,0.281713,0.199206
5,0.532066,0.713848,0.667376
6,0.620257,0.992436,0.588271
7,0.027993,0.230321,0.80514
8,0.876593,0.91331,0.820498
9,0.365606,0.732847,0.823639


In [26]:
df=spark.createDataFrame(pdf)

In [27]:
df.show()

+-------------------+-------------------+--------------------+
|                  0|                  1|                   2|
+-------------------+-------------------+--------------------+
| 0.6101431789049504|0.06234844598989331|  0.5523980407074291|
|0.07218175825447093| 0.9485706806828882|  0.4190506541490674|
| 0.8253240977742736| 0.8756232847167956|  0.8124638348598774|
| 0.8186818814831307| 0.2215257066541919|0.024691159386287853|
| 0.9076902770347569|0.28171309345571827| 0.19920598524486732|
| 0.5320658512277292| 0.7138484399275066|   0.667375603665374|
|  0.620256595742859| 0.9924355187259655|  0.5882708823580115|
|0.02799262668600533|0.23032075629775117|  0.8051401037152349|
|  0.876592595650472|  0.913309555829855|  0.8204976156955212|
| 0.3656060180579992| 0.7328469244149061|  0.8236387801099869|
| 0.6418553838937707| 0.9459684132652991|  0.5722678383271634|
| 0.5402814667023386|  0.354295245422354| 0.22549177926773523|
| 0.6988836400249732| 0.6856692138941141| 0.07988980475

In [28]:
result_pdf=df.select("*").toPandas()

In [29]:
result_pdf.head(3)

Unnamed: 0,0,1,2
0,0.610143,0.062348,0.552398
1,0.072182,0.948571,0.419051
2,0.825324,0.875623,0.812464


### !!!Pandas UDFs (a.k.a. Vectorized UDFs)

In [30]:
import pandas as pd

In [31]:
from pyspark.sql.functions import col,pandas_udf

In [32]:
from pyspark.sql.types import LongType

In [33]:
def multiply_func(a,b):
    return a*b

In [34]:
multiply=pandas_udf(multiply_func,returnType=LongType())

In [35]:
x=pd.Series([1,2,3])

In [36]:
x

0    1
1    2
2    3
dtype: int64

In [37]:
print(multiply_func(x,x))

0    1
1    4
2    9
dtype: int64


In [38]:
df=spark.createDataFrame(pd.DataFrame(x,columns=["x"]))

In [39]:
df.select(col("x")).show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+



In [40]:
df.select(multiply(col("x"),col("x"))).show()

+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



## !!Grouped map

In [41]:
from pyspark.sql.functions import pandas_udf,PandasUDFType

In [42]:
df=spark.createDataFrame(\
    [(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v")\
)

In [43]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [44]:
@pandas_udf("id long,v double",PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    v=pdf.v
    return pdf.assign(v=v-v.mean())

In [45]:
df.groupby("id").apply(substract_mean).show()

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



In [46]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

