# Create dataframe

## Create Dataframe from JSON file

In [None]:
df = sqlContext.read.json('data/people.json')
df.show()

## Convert RDD to Dataframe

In [None]:
from pyspark.sql import Row

rdd = sc.textFile('data/people.txt')
print rdd.collect()
pairs = rdd.map(lambda x: x.split(','))
print pairs.collect()
people = pairs.map(lambda p: Row(name=p[0], age=int(p[1])))
people_df = sqlContext.createDataFrame(people)
people_df.show()

## Convert RowRDD to Dataframe with explicit schema

In [None]:
from pyspark.sql.types import *

schema = StructType(
    [
        StructField('age', LongType(), True),
        StructField('name', StringType(), True)
    ]
)
people_df2 = sqlContext.createDataFrame(people, schema)
people_df2.show()

In [None]:
from pyspark.sql import types
dir(types)

# Work with Dataframe

## select

In [None]:
df.select("name").show()

In [None]:
df.select(df["name"], df["age"]+10).show()

## filter

In [None]:
df.filter(df['age'] > 21).show()

## groupby

In [None]:
df.groupBy("name").count().show()

In [None]:
df.groupBy("age").count().show()

# Add a column to Dataframe

In [None]:
df.withColumn('age2', df.age + 2).show()

In [None]:
df.withColumn('age2', df['age'] + 2).show()

In [None]:
from pyspark.sql.functions import udf

def display(name, age):
    return name+'/'+str(age)

display_udf = udf(display)

df.withColumn('display', display_udf(df.name, df.age)).show()

## Drop a column

In [None]:
df.drop("age").show()
df.show()

## Drop duplicated rows

In [None]:
from pyspark.sql import Row
df_with_dup = sc.parallelize(
    [Row(name='Alice', age=5, height=80),
     Row(name='Alice', age=5, height=80),
     Row(name='Alice', age=10, height=80)]).toDF()
df_with_dup.show()

In [None]:
# drop duplicate rows
df_with_dup.drop_duplicates().show()

In [None]:
# drop rows with duplicate columns
df_with_dup.drop_duplicates(['name', 'height']).show()

## Join Dataframe

In [None]:
from pyspark.sql import Row
df1 = sc.parallelize(
    [Row(name='Alice', age=5),
     Row(name='Tom', age=3),
     Row(name='Jerry', age=10)]).toDF()

df1.show()

df2 = sc.parallelize(
    [Row(name='Alice', height=80),
     Row(name='Tom', height=75),
     Row(name='John', height=60)]).toDF()
df2.show()

In [None]:
# inner join
df1.join(df2, df1.name == df2.name, 'inner').select(df1.name, 'age', 'height').show()

In [None]:
# left join
df1.join(df2, df1.name == df2.name, 'left').select(df2.name, 'age', 'height').show()

In [None]:
# outer join
df1.join(df2, df1.name == df2.name, 'outer').show()

## Rename a column

In [None]:
df.withColumnRenamed('age', 'year').show()

# Convert Dataframe to Pandas

In [None]:
pandas_df = df.toPandas()
pandas_df

# Describe Dataframes

In [None]:
df.describe().toPandas()