In [1]:
import pandas as pd
import findspark
findspark.init()
import pyspark
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import *

In [2]:
sc = SparkContext(appName="app")

In [3]:
sqlContext = SQLContext(sc)

In [4]:
data = [('Amy',25),('Joe',30),('John',22),('Jane',20),('Tim',26) ]

In [5]:
rdd = sc.parallelize(data)

In [6]:
pySparkDF = sqlContext.createDataFrame(rdd, ['name', 'age'])
# pySparkDF2 = sqlContext.createDataFrame(rdd)
print(pySparkDF)
print(pySparkDF.collect())

DataFrame[name: string, age: bigint]
[Row(name='Amy', age=25), Row(name='Joe', age=30), Row(name='John', age=22), Row(name='Jane', age=20), Row(name='Tim', age=26)]


In [7]:
pandasDF = pd.DataFrame(data, columns=['name', 'age'])
print(pandasDF)
print(pandasDF.columns)

   name  age
0   Amy   25
1   Joe   30
2  John   22
3  Jane   20
4   Tim   26
Index(['name', 'age'], dtype='object')


In [8]:
pySparkDF

DataFrame[name: string, age: bigint]

In [9]:
pandasDF

Unnamed: 0,name,age
0,Amy,25
1,Joe,30
2,John,22
3,Jane,20
4,Tim,26


In [10]:
pySparkDF.collect()

[Row(name='Amy', age=25),
 Row(name='Joe', age=30),
 Row(name='John', age=22),
 Row(name='Jane', age=20),
 Row(name='Tim', age=26)]

In [11]:
pySparkDF.toPandas()

Unnamed: 0,name,age
0,Amy,25
1,Joe,30
2,John,22
3,Jane,20
4,Tim,26


In [12]:
pySparkDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [13]:
pySparkDF.describe().show()

+-------+----+-----------------+
|summary|name|              age|
+-------+----+-----------------+
|  count|   5|                5|
|   mean|null|             24.6|
| stddev|null|3.847076812334269|
|    min| Amy|               20|
|    max| Tim|               30|
+-------+----+-----------------+



In [14]:
pySparkDF[(pySparkDF['age'] >= 22) & (pySparkDF['name'] != 'John')].show()
pandasDF[(pandasDF['age'] >= 22) & (pandasDF['name'] != 'John')]

+----+---+
|name|age|
+----+---+
| Amy| 25|
| Joe| 30|
| Tim| 26|
+----+---+



Unnamed: 0,name,age
0,Amy,25
1,Joe,30
4,Tim,26


In [15]:
print('Spark:\n')
pySparkDF.select('name',(pySparkDF['age'] >= 22) & (pySparkDF['name'] != 'John')).show()
pySparkDF[(pySparkDF['age'] >= 22) & (pySparkDF['name'] != 'John')].select('name').show()

print('Pandas:\n')
print(pandasDF['name'][(pandasDF['age'] >= 22) & (pandasDF['name'] != 'John')])

Spark:

+----+-------------------------------------+
|name|((age >= 22) AND (NOT (name = John)))|
+----+-------------------------------------+
| Amy|                                 true|
| Joe|                                 true|
|John|                                false|
|Jane|                                false|
| Tim|                                 true|
+----+-------------------------------------+

+----+
|name|
+----+
| Amy|
| Joe|
| Tim|
+----+

Pandas:

0    Amy
1    Joe
4    Tim
Name: name, dtype: object


In [16]:
spark_df1 = pySparkDF.alias("spark_df1") 
spark_df2 = pySparkDF.alias("spark_df2")
joined_df = spark_df1.join(spark_df2, spark_df1.name == spark_df2.name, 'inner')

In [17]:
joined_df.show()

+----+---+----+---+
|name|age|name|age|
+----+---+----+---+
| Joe| 30| Joe| 30|
| Amy| 25| Amy| 25|
|John| 22|John| 22|
| Tim| 26| Tim| 26|
|Jane| 20|Jane| 20|
+----+---+----+---+



In [18]:
pandas_df1 = pandasDF
pandas_df2 = pandasDF
pandas_df1.join(pandas_df2, rsuffix="_df2")

Unnamed: 0,name,age,name_df2,age_df2
0,Amy,25,Amy,25
1,Joe,30,Joe,30
2,John,22,John,22
3,Jane,20,Jane,20
4,Tim,26,Tim,26


In [19]:
spark_df1.select(spark_df1['age'] > 20).show()
spark_df1[spark_df1['age'] > 20].show()
row_list = spark_df1[spark_df1['age'] > 20]

+----------+
|(age > 20)|
+----------+
|      true|
|      true|
|      true|
|     false|
|      true|
+----------+

+----+---+
|name|age|
+----+---+
| Amy| 25|
| Joe| 30|
|John| 22|
| Tim| 26|
+----+---+



In [20]:
spark_df1.collect()[:3]

[Row(name='Amy', age=25), Row(name='Joe', age=30), Row(name='John', age=22)]

In [21]:
pandas_df1[:2]

Unnamed: 0,name,age
0,Amy,25
1,Joe,30


In [22]:
df_3 = spark_df1.union(spark_df2)
df_3.show()

+----+---+
|name|age|
+----+---+
| Amy| 25|
| Joe| 30|
|John| 22|
|Jane| 20|
| Tim| 26|
| Amy| 25|
| Joe| 30|
|John| 22|
|Jane| 20|
| Tim| 26|
+----+---+



In [23]:
pd.concat([pandas_df1, pandas_df2])

Unnamed: 0,name,age
0,Amy,25
1,Joe,30
2,John,22
3,Jane,20
4,Tim,26
0,Amy,25
1,Joe,30
2,John,22
3,Jane,20
4,Tim,26


In [24]:
spark_df1.head(2)

[Row(name='Amy', age=25), Row(name='Joe', age=30)]

In [25]:
nested_data = [('Amy',25, ['Joe', 'Jane']),('Joe',30, ['Amy']),('John',22, ['Tim']),
               ('Jane',20, ['Amy', 'Tim']),('Tim',26, ['Jane', 'John']) ]

In [26]:
nested_rdd = sc.parallelize(nested_data)
spark_nested_df = sqlContext.createDataFrame(nested_rdd, ['name', 'age', 'friends'])

In [27]:
spark_nested_df.show()

+----+---+------------+
|name|age|     friends|
+----+---+------------+
| Amy| 25| [Joe, Jane]|
| Joe| 30|       [Amy]|
|John| 22|       [Tim]|
|Jane| 20|  [Amy, Tim]|
| Tim| 26|[Jane, John]|
+----+---+------------+



In [28]:
explodeDF = spark_nested_df.withColumn("friend",explode("friends"))
explodeDF.show()
explodeDF.drop("friends").show()

+----+---+------------+------+
|name|age|     friends|friend|
+----+---+------------+------+
| Amy| 25| [Joe, Jane]|   Joe|
| Amy| 25| [Joe, Jane]|  Jane|
| Joe| 30|       [Amy]|   Amy|
|John| 22|       [Tim]|   Tim|
|Jane| 20|  [Amy, Tim]|   Amy|
|Jane| 20|  [Amy, Tim]|   Tim|
| Tim| 26|[Jane, John]|  Jane|
| Tim| 26|[Jane, John]|  John|
+----+---+------------+------+

+----+---+------+
|name|age|friend|
+----+---+------+
| Amy| 25|   Joe|
| Amy| 25|  Jane|
| Joe| 30|   Amy|
|John| 22|   Tim|
|Jane| 20|   Amy|
|Jane| 20|   Tim|
| Tim| 26|  Jane|
| Tim| 26|  John|
+----+---+------+

