# GUIDE ON PYSPARK OPERATIONS
---
## Language
- Python 2.7
- Spark/PySpark 1.6

## Table of Contents
 1. [Initial Setup (Required)](#Initial Setup)
 2. [Import Data](#Import Data)
     import hdfs hive tables from cornerstone
 3. [Inspect Data](#Inspect Data)
       <br />a. [Show Table](#first rows)
       <br />b. [Table Dimension](#DF dimension)
       <br />c. [Schema Review](#Schema)
       <br />d. hive table metadata (variable desc) (Tad is codeready)
 4. [Exam Data](#missing value)
       <br />a. [Describe DataFrame](#describe)
       <br />b. [Missing Value](#null)
       <br />c. [NA Value](#NA)
       <br />d. [Unique Value](#duplicates)
 5. [Data Cleaning](#data cleaning)
       <br />a. [Null Drop/Fill](#treatment)
       <br />b. [Time Format](#time_table)
       <br />c. [Change Float to String](#change_schema)
       <br />d. [String Structure](#string_split)
 6. [SQL Data Extraction](#sql)
       <br />a. [Select, Filter, Delete, Sort](#select_filter)
       <br />b. [Group By and Summerize](#group_by)
       <br />c. [Deriving New Variables](#new_var)
       <br />d. [Renanme Variables](#rename_var)
 7. [SQL Table Relationship](#relationship)
       <br />a. [Append Rows](#append)
       <br />b. [Add Columns](#adding columns)
       <br />c. [Join Tables](#joins)
 8. [Analysis](#statistic)
 9. [Sampling Method and Data Ouput](#output)

<a id='Initial Setup'></a>

## 1. Initial Setup

In this section, we will cover initializing a local SparkContext, which is then wrapped with SQLContext for additional functionality.  
    - SparkContext object, which represents a connection to a computer cluster.  
    - SQLContext is the main entry point for Spark DataFrame and SQL functionality.  It can create data frame which allows you to direct the operation on your data. 

In [30]:
from IPython.display import display, HTML
from pyspark import SQLContext, SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os
SUBMIT_ARGS = "--packages com.databricks:spark-csv_2.11:1.2.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

try:
    sc = SparkContext()
    sqlc = SQLContext(sc)
except:
    print "Setup Ready"
    
    # Verifying if Spark is active
print(sc.version)

Setup Ready
1.6.1


<a id='Import Data'></a>

## 2. Import Data 

In this section, we will cover how to load a flat file (specifically, a csv) using a custom format reader.

In [31]:
## IN SAS:
#{"IsDistinguishedFolder":true,"FolderId":{"Id":"AAMkADQ1YTQ0MmY3LTdmYjMtNGM5Zi1hYTY2LTUyNmFmMGI4YjE3ZQAuAAAAAADXkznN6HMLRoMOPc/OzpiAAQCuiSOT3Tn/ToC85VO/Qu1kAAAAAAEMAAA=","ChangeKey":"AQAAABYAAACuiSOT3Tn/ToC85VO/Qu1kAACLRDeL"},"DragItemType":2}# SAS equivalent: proc import
# proc import datafile="../data/cs-training-dates.csv"
#             dbms=csv
#             out=loans
#             replace;
#         delimiter=",";
#         getnames=yes;
# run;

In [32]:
# First, speicify the path to the file.
data_file = "../a1_data/cs-training.csv"

# SQLContext read in the file and create a dataframe called "df"
df = (sqlc.read
      .format("com.databricks.spark.csv") # in the quotation mark you can change to your file type, i.e. ("json")
      .option("header", "true") # with header
      .option("inferschema", "true") # schema is true
      .load(data_file)) # load the file path you declared earlier

#### Note: 

Import data from multiple file formats. Please use help(sqlc.read) to explore. 

##### If you know your file type, you can use the following:
    - df_text = sqlc.read.text("***  .txt file path****")
    - df_json = sqlc.read.json("*** .json file path****")
    - df_parquet = sqlc.read.parquet("***.parquet file path****")

##### sqlc.read.load is the general way to handle different format. 
    - df = sqlc.read.format('jdbc').options(url=url;dbtable=query).load() 

<a id='Inspect Data'></a>

## 3. Inspect Data

In this section, we will be looking at how to get an overview of the data.

<a id="first rows"></a>
### 3.a)  Show Table
This is to look at the first 2 rows of the data. 

In [33]:
df.show(2)
## Alternatively, 
# df.first(): this can show you the first row of the data. 

+-------+----------------+------------------------------------+---+------------------------------------+-----------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|user_id|SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30_59DaysPastDueNotWorse|  DebtRatio|MonthlyIncome|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|NumberOfTime60_89DaysPastDueNotWorse|NumberOfDependents|
+-------+----------------+------------------------------------+---+------------------------------------+-----------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|  10000|               1|                         0.766126609| 45|                                   2|0.802982129|         9120|                             13|                      

<a id='DF dimension'></a>
### 3.b) Tabel Dimension
Check the shape of the dataframe

In [34]:
## check the shape of the dataframe
print "number of rows: {!r}".format(df.count()) # total number of rows
print "number of columns: {!r}".format(len(df.columns))

number of rows: 150000
number of columns: 12


<a id="Schema"></a>
### 3.c) Schema Review
<br />Print Schema of the imported data frame

In [35]:
df.printSchema()
#df.dtypes # this will give you a list of the tuples (column name , type)
# df.columns # this will give you a list of colnames only

root
 |-- user_id: integer (nullable = true)
 |-- SeriousDlqin2yrs: integer (nullable = true)
 |-- RevolvingUtilizationOfUnsecuredLines: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- NumberOfTime30_59DaysPastDueNotWorse: integer (nullable = true)
 |-- DebtRatio: double (nullable = true)
 |-- MonthlyIncome: string (nullable = true)
 |-- NumberOfOpenCreditLinesAndLoans: integer (nullable = true)
 |-- NumberOfTimes90DaysLate: integer (nullable = true)
 |-- NumberRealEstateLoansOrLines: integer (nullable = true)
 |-- NumberOfTime60_89DaysPastDueNotWorse: integer (nullable = true)
 |-- NumberOfDependents: string (nullable = true)



In [36]:
## This is to save the schema of the table to the a .csv file if you want. It is named schema.csv in this example.
#schema = sqlc.createDataFrame(df.dtypes,
#                              StructType([StructField('Column',StringType(),True),
#                                          StructField('DataType',StringType(),True)]))
#schema.write.format("com.databricks.spark.csv").option("header", "true").save("schema.csv")

<a id='missing value'></a>
## 4. Exam Data
In this section, we will exam the quality of the dataframe, including missing value (NULL), blank, or "NA" in the record. 
<a id="describe"></a>
### 4.a) Description of the DataFrame

In [37]:
# To give an overview of your selected variables
df.describe(["age", "SeriousDlqin2yrs", "RevolvingUtilizationOfUnsecuredLines"]).show()

+-------+------------------+-------------------+------------------------------------+
|summary|               age|   SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|
+-------+------------------+-------------------+------------------------------------+
|  count|            150000|             150000|                              150000|
|   mean|52.295206666666665|            0.06684|                    6.04843805466686|
| stddev|14.771865863100352|0.24974553092872043|                  249.75537062543987|
|    min|                 0|                  0|                                 0.0|
|    max|               109|                  1|                             50708.0|
+-------+------------------+-------------------+------------------------------------+



In [38]:
## Look at statics of every varible
for col in df.columns:
    print df.describe(col).show()

+-------+------------------+
|summary|           user_id|
+-------+------------------+
|  count|            150000|
|   mean|           84999.5|
| stddev|43301.414526548666|
|    min|             10000|
|    max|            159999|
+-------+------------------+

None
+-------+-------------------+
|summary|   SeriousDlqin2yrs|
+-------+-------------------+
|  count|             150000|
|   mean|            0.06684|
| stddev|0.24974553092872043|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

None
+-------+------------------------------------+
|summary|RevolvingUtilizationOfUnsecuredLines|
+-------+------------------------------------+
|  count|                              150000|
|   mean|                    6.04843805466686|
| stddev|                  249.75537062543987|
|    min|                                 0.0|
|    max|                             50708.0|
+-------+------------------------------------+

None
+-------+------------------+

<a id="null"></a>
### 4.b) Missing Value
##### How many missing value in a particular column?

In [39]:
# The missing value count by variable can be obtained by using isNull() and count() functions

df.where(df["MonthlyIncome"].isNull()).count() # change the MonthlyIncome to the particular colnames you are examing.
#df.where(df["MonthlyIncome"].isNull()).show()

0

##### How many missing value per column?

In [44]:
### FUNCTION
from pyspark.sql.functions import col, sum

def count_null(col_name):
  return sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.
#
# This could be done in one line with a Python list comprehension, but we're keeping
# it simple for those who don't know Python very well.
exprs = []
for col_name in df.columns:
  exprs.append(count_null(col_name))

# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
df.agg(*exprs).show()

+-------+----------------+------------------------------------+---+------------------------------------+---------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|user_id|SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30_59DaysPastDueNotWorse|DebtRatio|MonthlyIncome|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|NumberOfTime60_89DaysPastDueNotWorse|NumberOfDependents|
+-------+----------------+------------------------------------+---+------------------------------------+---------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|      0|               0|                                   0|  0|                                   0|        0|            0|                              0|                      0|      

##### Examine if there is relationship in between missing values in columns

In [45]:
# to check whether certian columns have null.
df.filter(df["age"].isNull()|
         df["SeriousDlqin2yrs"].isNull()|
         df["NumberOfDependents"].isNull()).count()

0

<a id = "NA"></a>
### 4.c) "NA"s Count
"NA" is string that insteand of NULL has been used in certain dataset. The code can also be replaced by another string.  
##### How many NA in a particular column?

In [46]:
print df.filter(df['MonthlyIncome'] == "NA").count()
print df.filter(df['NumberOfDependents'] == "NA").count()

29731
3924


##### Examine if there is relationship in between NAs in columns

In [47]:
## To examine if NA's relationship between colnames.
df.where(
    (col('MonthlyIncome') == 'NA') & 
    (col('NumberOfDependents') == 'NA')).show(2) 

+-------+----------------+------------------------------------+---+------------------------------------+---------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|user_id|SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30_59DaysPastDueNotWorse|DebtRatio|MonthlyIncome|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|NumberOfTime60_89DaysPastDueNotWorse|NumberOfDependents|
+-------+----------------+------------------------------------+---+------------------------------------+---------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|  10008|               0|                         0.116950644| 27|                                   0|     46.0|           NA|                              2|                      0|      

<a id = "duplicates"></a>
### 4.d) Unique Record Count
Count the number of duplicates records in the dataframe
##### How many exact unique rows in the dataframd? 

In [48]:
# Count the unique rows.
df.distinct().count()
# Then you can drop the duplicated rows
df.dropDuplicates() 

DataFrame[user_id: int, SeriousDlqin2yrs: int, RevolvingUtilizationOfUnsecuredLines: double, age: int, NumberOfTime30_59DaysPastDueNotWorse: int, DebtRatio: double, MonthlyIncome: string, NumberOfOpenCreditLinesAndLoans: int, NumberOfTimes90DaysLate: int, NumberRealEstateLoansOrLines: int, NumberOfTime60_89DaysPastDueNotWorse: int, NumberOfDependents: string]

##### How many duplicated records in a particular column?

In [49]:
# Count the unique records within the column called "user_id".
df.select(['user_id', 'age']).distinct().count()
#df.dropDuplicates(['user_id','age'])

150000

<a id="data cleaning"></a>
## 5. Data Cleaning
In this section, we will introduce the missing value treatment, and splitting a column of string into seperate columns
<a id="treatment"></a>
### 5.a) Null Drop / Fill 

In [50]:
## run the following two functions to create function to convert CERTAIN value 

# this is to convert blank to null (None) in Python
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

# this is to convert "NA" to null (None) in Python
def NA_as_null(colname):
    return when(col(colname) != "NA", col(colname)).otherwise(None)

In [51]:
## convert "NA" to null (None) and rename it to dataframe dfNADropped
dfNADropped = df.withColumn("NumberOfDependents", NA_as_null("NumberOfDependents"))

## Drop the missing value rows where NumberOfDependents is missing
dfNADropped.na.drop().count()

146076

In [52]:
## convert "NA" to null (None) and rename it to dataframe dfNADropped
dfNAFilled = df.withColumn("NumberOfDependents", NA_as_null("NumberOfDependents"))

## Fill the missing value record by 0 in column NumberOfDependents
dfNAFilled.na.fill({"NumberOfDependents": 0}).count()

150000

<a id='time_table'></a>
### 5.b) Time Format

In [53]:
##  Convert Common Log time format into a Python datetime object FUNCTION
month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(s):
    """ Convert Common Log time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring time zone here. In a production application, you'd want to handle that.
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(s[7:11]),
      month_map[s[3:6]],
      int(s[0:2]),
      int(s[12:14]),
      int(s[15:17]),
      int(s[18:20])
    )

u_parse_time = udf(parse_clf_time)

In [54]:
# To use it, change the "" column to your time date columne

#df.select('*', u_parse_time(df['timestamp']).cast('timestamp').alias('time'))

<a id='change_schema'></a>
### 5.c) Change Float to String

In [55]:
# Create a new table that add a new column of changing the data type of a columne and drop the original column

convert_numbers_to_string = df.withColumn("MonthlyIncome_new", df["MonthlyIncome"].cast('float')).drop("MonthlyIncome")
convert_numbers_to_string.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- SeriousDlqin2yrs: integer (nullable = true)
 |-- RevolvingUtilizationOfUnsecuredLines: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- NumberOfTime30_59DaysPastDueNotWorse: integer (nullable = true)
 |-- DebtRatio: double (nullable = true)
 |-- NumberOfOpenCreditLinesAndLoans: integer (nullable = true)
 |-- NumberOfTimes90DaysLate: integer (nullable = true)
 |-- NumberRealEstateLoansOrLines: integer (nullable = true)
 |-- NumberOfTime60_89DaysPastDueNotWorse: integer (nullable = true)
 |-- NumberOfDependents: string (nullable = true)
 |-- MonthlyIncome_new: float (nullable = true)



<a id="string_split"></a>
## 5.d) String Structure

In [56]:
## The first four characters of the string in a column and now rename it test. 
## Note that this only works when the columns is in STRING format.  Please see 5.c) step.
convert_numbers_to_string.select(convert_numbers_to_string["MonthlyIncome_new"].substr(0, 4).alias('test')).show(3)

+----+
|test|
+----+
|9120|
|2600|
|3042|
+----+
only showing top 3 rows



In [57]:
# split a column by the numbers [as9df] will be [as, df]; can be sub by '-'. This will be saperated by the "-",
# convert_numbers_to_string.select(split(convert_numbers_to_string["MonthlyIncome_new"], '[0-9]+').alias('test')).collect()

[Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=None),
 Row(test=[u'', u'.', u'']),
 Row(test=None),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=None),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=None),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row(test=[u'', u'.', u'']),
 Row

## 6. SQL Data Extraction
After previous steps, in this section, you can extract the specific variables you need and use the standard SQL Query language to do your work here.
<br /> **Note:** The registerDataFrameAsTable step is necessary
<a id='select_filter'></a>
### 6.a) Select, Filter, Delete, Sort

In [58]:
# NEED TO CONVERT DATAFRAME TO TABLE FIRST 
sqlc.registerDataFrameAsTable(df, "sql_table")

In [59]:
## in the """ """, you can use your standard SQL query.
sqlc.sql("""SELECT * FROM sql_table WHERE age >= 18 SORT BY SeriousDlqin2yrs""").show(2)  # Select all the the records filter on age column 

## Alternatively, you can do the following 
# df.filter(df["age"] >=18).show(3).sort(df["age"])
# df.drop('age').columns

+-------+----------------+------------------------------------+---+------------------------------------+-----------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|user_id|SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30_59DaysPastDueNotWorse|  DebtRatio|MonthlyIncome|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|NumberOfTime60_89DaysPastDueNotWorse|NumberOfDependents|
+-------+----------------+------------------------------------+---+------------------------------------+-----------+-------------+-------------------------------+-----------------------+----------------------------+------------------------------------+------------------+
|  10001|               0|                         0.957151019| 40|                                   0|0.121876201|         2600|                              4|                      

<a id="group_by"></a>
### 6.b) Group By and Summerize

In [60]:
# GroupBy Statement:
sqlc.sql("""SELECT avg(MonthlyIncome) AS Avg_Mon_Income, max(age) AS max_age FROM sql_table GROUP BY NumberOfTimes90DaysLate, NumberOfDependents""").show(2)

## Alternatively: You can group by any number of coulmns and calculate any kind of aggregate function
#df.groupBy('NumberOfTimes90DaysLate','NumberOfDependents').agg({"MonthlyIncome": "avg", "age": "max"}).show(5)

+-----------------+-------+
|   Avg_Mon_Income|max_age|
+-----------------+-------+
|4658.904504504504|     88|
|4804.447811447812|     78|
+-----------------+-------+
only showing top 2 rows



<a id= "new_var"></a>
### 6. c) Deriving New Variables

In [61]:
derived_var_table = sqlc.sql(
    """SELECT user_id, MonthlyIncome/NumberOfDependents AS Ratio_Income_to_Dependents 
    FROM sql_table""").show(2)

# Alternatively:
# df.withColumn('Ratio_Income_to_Dependents',df.MonthlyIncome/df.NumberOfDependents).show(3)

+-------+--------------------------+
|user_id|Ratio_Income_to_Dependents|
+-------+--------------------------+
|  10000|                    4560.0|
|  10001|                    2600.0|
+-------+--------------------------+
only showing top 2 rows



<a id= "rename_var"></a>
### 6. d) Rename Variables

In [62]:
# Rename the variable user_id to USER_ID
sqlc.sql("""SELECT user_id as USER_ID, age, MonthlyIncome FROM sql_table""").columns

# Alternatively: A column can be renamed as
# df = df.withColumnRenamed('user_id', 'USER_ID')
# df.columns

['USER_ID', 'age', 'MonthlyIncome']

<a id="relationship"></a>

## 7. SQL Table Relationship
In this section, we will focus on multiple table manipulation. 
<a id="append"></a>
###  7.a) Append Rows

In [63]:
## Note that Spark Dataframe is immutable.  df after unionAll is still df with 150K rows.
df1 = df.unionAll(df)
df1.count()

300000

<a id="adding columns"></a>
###  7.b) Add Columns

In [64]:
# Create a new column called "new_var" with number 0 
df_with_new_var = df.withColumn("new_var", lit(0))

<a id="joins"></a>
###  7.c) Join Table

In [65]:
# NEED TO CONVERT DATAFRAME TO TABLE FIRST. 
sqlc.registerDataFrameAsTable(df, "sql_table_1")
sqlc.registerDataFrameAsTable(df, "sql_table_2")

In [66]:
## LEFT JOIN

left_join = sqlc.sql("""SELECT * FROM sql_table_1 LEFT JOIN sql_table_2 ON sql_table_1.user_id = sql_table_2.user_id""")

# Alternatively, you can write it the following style:
# left_join = df1.join(df2, df1["user_id"] == df2["user_id"], 'leftouter').drop(df2["user_id"]).show(1)

In [67]:
## INNER JOIN
inner_join = sqlc.sql("""SELECT * FROM sql_table_1 INNER JOIN sql_table_2 ON sql_table_1.user_id = sql_table_2.user_id""")

# Alternatively, you can write it the following style:
# inner_join = df1.join(df2, df1["user_id"] == df2["user_id"], 'inner').drop(df2["user_id"]).show(1)

In [68]:
## OUTER JOIN
outer_join = sqlc.sql("""SELECT * FROM sql_table_1 FULL OUTER JOIN sql_table_2 ON sql_table_1.user_id = sql_table_2.user_id""")

# Alternatively, you can write it the following style:
#outer_join = df1.join(df2, df1["user_id"] == df2["user_id"], 'outer').drop(df2["user_id"]).show(1)

<a id="statistic"></a>
## 8. Initial Analysis

In [69]:
## Show the frequency of certain categorical variables
df.groupBy('SeriousDlqin2yrs').count().show()

+----------------+------+
|SeriousDlqin2yrs| count|
+----------------+------+
|               0|139974|
|               1| 10026|
+----------------+------+



In [70]:
## Looking for min , average, max of a particular columns.
output = df.agg(min(df['age']), avg(df['age']),max(df['age'])).show()

+--------+------------------+--------+
|min(age)|          avg(age)|max(age)|
+--------+------------------+--------+
|       0|52.295206666666665|     109|
+--------+------------------+--------+



<a id= "output"></a>
## 9. Sampling and Data Output

In [74]:
# A Sample can be created out of a dataset using the sample function
# Please specify the value withReplacement as True or False with fraction and also seed value if required
sample_Data = df.sample(withReplacement=True,fraction=0.5)

# You can also create a sample using a column 
# df.sampleBy('age',fractions={0:1}).show()

In [77]:
## Output the sample data 
sample_Data.write.json("../a1_data/sample_Data")

# df.write.csv(path)/ df.write.jason(path)/ df.write.parquet(path)/ df.write.orc(path) 