In [1]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [2]:
stringJSONDF = spark.read.json(stringJSONRDD);

In [3]:
stringJSONDF.show();

In [4]:
stringJSONDF.createOrReplaceTempView('stringJSON');

In [5]:
spark.sql("SELECT * FROM stringJSON WHERE age > 20").show();

In [6]:
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')]);

In [7]:
from pyspark.sql.types import *

schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

In [8]:
swimmers.show();

In [9]:
swimmers.printSchema();

In [10]:
from pyspark.sql import Row

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee11 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 200000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee31 = Employee('matei', None, 'no-reply@waterloo.edu', 180000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4, employee11])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee31])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

In [11]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2 , departmentWithEmployees3 , departmentWithEmployees4]
df1 = sqlContext.createDataFrame(departmentsWithEmployeesSeq1)

In [12]:
display(df1);

In [13]:
from pyspark.sql.functions import explode

df = df1.select("department",explode("employees").alias("emp"))
display(df);

In [14]:
display(df.selectExpr('department.*' , 'emp.*'));

In [15]:
df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])
df.show()

In [16]:
df.drop_duplicates();
df.show();

In [17]:
df.drop_duplicates(subset = ['weight']);
df.show();

In [18]:
df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

In [19]:
df_miss.rdd.map(lambda row : sum([c == None for c in row]) ).collect()

In [20]:
from pyspark.sql.functions import count

df_miss.agg(*[count(c) for c in df_miss.columns]).show()

In [22]:
df_miss.rdd.map(lambda x : [str(c).isdigit() == True for c in x] ).collect()

In [23]:
from pyspark.sql.functions import udf

titanicData = spark.read.csv('/FileStore/tables/titanic_train-ac800.csv' , header = True , inferSchema=True);

In [24]:
def getLen(name):
  return len(name)

In [25]:
from pyspark.sql.types import IntegerType

len_udf = udf(getLen , IntegerType());

In [26]:
titanic_data_len = titanicData.select("*" , len_udf("Name").alias("name_len"));

In [27]:
titanic_data_len.agg({"name_len":"max"}).collect()

In [28]:
def isFemale(name):
  if 'Mrs' in name or 'Miss' in name:
    return True;
  else :
    return False;

In [29]:
from pyspark.sql.types import BooleanType
isFemale_udf = udf(isFemale , BooleanType());

In [30]:
titanic_data_female = titanicData.select("Name" , isFemale_udf("Name").alias("isFemale")).show();

In [31]:
from pyspark.sql.functions import pandas_udf , PandasUDFType

import pandas as pd
from scipy import stats

In [32]:
@pandas_udf('double' , PandasUDFType.SCALAR)
def pandas_cdf(v):
  return pd.Series(stats.norm.cdf(v));

In [33]:
df.show();

In [34]:
df = spark.createDataFrame([('a', 2,3) , ('b', 4,5) , ('c', 6,7) , ('d', 8,9) , ('e', 10,11)] , ['id' , 'c1' , 'c2']);

In [35]:
df.show();

In [36]:
@pandas_udf(df.schema , PandasUDFType.GROUPED_MAP)
def find_mean(pdf):
  print(type(pdf));
  return pdf.assign(v = pdf.v - pdf.id)

In [37]:
@pandas_udf(df.schema , PandasUDFType.GROUPED_MAP)
def sub_mean(pdf):
  print(type(pdf));
  return pdf.assign(v = min(pdf.c1 , pdf.c2) )

In [38]:
titanicData_limit = spark.read.csv('/FileStore/tables/titanic_train-ac800.csv' , header = True , inferSchema=True).limit(20);

In [39]:
titanicData_limit.show();