In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, translate, lower
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql import functions as F
spark=SparkSession.builder.appName('BigData').master('yarn').getOrCreate()
sc=spark.sparkContext
sql=SQLContext(sc)

In [3]:
spark

In [4]:
orders = spark.read.csv('/public/retail_db/orders',inferSchema=True).toDF('order_id','order_date','order_total','order_status')
order_items = spark.read.csv('/public/retail_db//order_items',header=True,inferSchema=True)
customers = spark.read.csv('/public/retail_db//customers',header=True,inferSchema=True)
products = spark.read.csv('/public/retail_db//products',header=True,inferSchema=True)

In [3]:
orders = spark.read.csv('retail_db//orders.csv',header=True,inferSchema=True)
order_items = spark.read.csv('retail_db//order_items.csv',header=True,inferSchema=True)
customers = spark.read.csv('retail_db//customers.csv',header=True,inferSchema=True)
products = spark.read.csv('retail_db//products.csv',header=True,inferSchema=True)

AnalysisException: Path does not exist: hdfs://localhost:9000/user/pankaj/retail_db/orders.csv;

# select

In [5]:
orders.select(orders.order_status,'order_status',col('order_status')).show()

+---------------+---------------+---------------+
|   order_status|   order_status|   order_status|
+---------------+---------------+---------------+
|         CLOSED|         CLOSED|         CLOSED|
|PENDING_PAYMENT|PENDING_PAYMENT|PENDING_PAYMENT|
|       COMPLETE|       COMPLETE|       COMPLETE|
|         CLOSED|         CLOSED|         CLOSED|
|       COMPLETE|       COMPLETE|       COMPLETE|
|       COMPLETE|       COMPLETE|       COMPLETE|
|       COMPLETE|       COMPLETE|       COMPLETE|
|     PROCESSING|     PROCESSING|     PROCESSING|
|PENDING_PAYMENT|PENDING_PAYMENT|PENDING_PAYMENT|
|PENDING_PAYMENT|PENDING_PAYMENT|PENDING_PAYMENT|
| PAYMENT_REVIEW| PAYMENT_REVIEW| PAYMENT_REVIEW|
|         CLOSED|         CLOSED|         CLOSED|
|PENDING_PAYMENT|PENDING_PAYMENT|PENDING_PAYMENT|
|     PROCESSING|     PROCESSING|     PROCESSING|
|       COMPLETE|       COMPLETE|       COMPLETE|
|PENDING_PAYMENT|PENDING_PAYMENT|PENDING_PAYMENT|
|       COMPLETE|       COMPLETE|       COMPLETE|


#### But while using functions it is applicable to specify the full notation, i.e., dataframe.column_name or using col as below

In [30]:
orders.select(lower(orders.order_status),lower(col('order_status')),lower('order_status')).show()

+-------------------+-------------------+-------------------+
|lower(order_status)|lower(order_status)|lower(order_status)|
+-------------------+-------------------+-------------------+
|             closed|             closed|             closed|
|    pending_payment|    pending_payment|    pending_payment|
|           complete|           complete|           complete|
|             closed|             closed|             closed|
|           complete|           complete|           complete|
|           complete|           complete|           complete|
|           complete|           complete|           complete|
|         processing|         processing|         processing|
|    pending_payment|    pending_payment|    pending_payment|
|    pending_payment|    pending_payment|    pending_payment|
|     payment_review|     payment_review|     payment_review|
|             closed|             closed|             closed|
|    pending_payment|    pending_payment|    pending_payment|
|       

# alias
#### alias should be enclosed within select 

### Note : Please do not give alias the same name as built-in functions 

In [31]:
#orders.select(orders.order_status.alias('Status_alias')).show()
#Note: orders.select('order_status').alias('Status_alias').show()  will not work
orders.select(lower(orders.order_status).alias('1'),lower(col('order_status')).alias('2'),lower('order_status').alias('3'))

DataFrame[1: string, 2: string, 3: string]

## Distinct

In [32]:
orders.select(lower(orders.order_status).alias('1'),lower(col('order_status')).alias('2'),lower('order_status').alias('3')).distinct().show()

+---------------+---------------+---------------+
|              1|              2|              3|
+---------------+---------------+---------------+
|        on_hold|        on_hold|        on_hold|
|     processing|     processing|     processing|
|       canceled|       canceled|       canceled|
|        pending|        pending|        pending|
|         closed|         closed|         closed|
| payment_review| payment_review| payment_review|
|       complete|       complete|       complete|
|suspected_fraud|suspected_fraud|suspected_fraud|
|pending_payment|pending_payment|pending_payment|
+---------------+---------------+---------------+



# withcolumn
#### If already existing column name is given, then the new column expression will replace the existing column. (It will not add another column)


In [None]:
orders = orders. \
withColumn('order_id',orders.order_id.cast('bigint')). \
withColumn('order_date',orders.order_date.cast('date')). \
withColumn('order_customer_id',orders.order_customer_id.cast('bigint')). \
withColumn('order_status',col('order_status').cast('string')) 


In [None]:
orders.select('order_id','order_status').withColumn('SomethingNew',orders.order_id+1).show()

#### Given below are the cast notations from pyspark.sql.types

In [None]:
BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

# selectExpr

#### SQL like expressions can be used for evaluation

In [None]:
orders.selectExpr('order_id||order_date||","||order_customer_id||","||order_status||"," as textdata ').show(1)

# case
#### CASE as in SQL can be used

In [None]:
orders.selectExpr('CASE WHEN order_status in ("COMPLETE","CLOSED") THEN "COMPLETELD" WHEN order_status = "CANCELED" THEN "CANCEL" ELSE "NONE" END Derived_status').show()

# when
#### Dataframe equivalent of case 

In [None]:
orders.withColumn('Derived',when(orders.order_status.isin('COMPLETE','CLOSED'),'DONEEE').when(orders.order_status=='PENDING_PAYMENT','Gareeb').otherwise('Bhool Ja')).show()

# withColumnRenamed
#### columns can be renamed with this API, where the first argument is the existing column name and the second argument is the new name.

In [None]:
orders.withColumnRenamed('order_status','status_of_order').show()

# filter

#### filter as in SQL format

In [None]:
orders.filter?

In [None]:
orders.filter("order_status = 'COMPLETE'").show()
#Note: Single '=' as in SQL format

#### filter as in Dataframe format

In [None]:
orders.filter(orders.order_status == 'COMPLETE').show()
#Note: Equality operator '==' for dataframe filter expression 

#### multiple filters in SQL format filter

In [None]:
orders.filter("order_status = 'COMPLETE' OR order_status = 'CLOSED'").show()

#### multiple filters in DataFrame format

In [None]:
#orders.filter((orders.order_status == 'COMPLETE').__or__(orders.order_status == 'CLOSED')).show() 
orders.filter((orders.order_status == 'COMPLETE') | (orders.order_status == 'CLOSED')).show()

###### Examples of few multiple filters

In [None]:
orders.filter("order_status in ('COMPLETE' ,'CLOSED') AND date_format(order_date,'yyyyMM') = '201308'").show()

In [None]:
#orders.registerTempTable("df")
#spark.sql("select * from df where order_date like '2013-08%'").show()

orders.filter(orders.order_status.isin('COMPLETE','CLOSED') & orders.order_date.like('2013-08%')).show()

In [None]:
orders.filter('order_customer_id >= 1000').filter('order_customer_id != 2321').filter(orders.order_id.between(1000,1999)).show()

# join

#### simple inner join with one column mapping

In [None]:
orders.join?

In [7]:
orders.join(order_items, orders.order_id==order_items.order_item_order_id).show()

AttributeError: 'DataFrame' object has no attribute 'order_item_order_id'

To perform join but get results from only the left dataframe.

In [None]:
orders.join(orders, ['order_id'],how='left_semi').show()

#### inner join with multiple columns

In [None]:
orders.join(order_items, [orders.order_id==order_items.order_item_order_id, orders.order_id==order_items.order_item_order_id]).show()

In [None]:
#### left join

In [None]:
customers.join(orders, customers.customer_id==orders.order_customer_id, 'left').show()

#### right join

In [None]:
customers.join(orders, customers.customer_id==orders.order_customer_id, 'right').show()

#### leftanti join - Join to fetch records which exists only in one table
In the below case it will fetch the records which are present only in the orders and not in order_items

In [None]:
customers.join(orders, orders.order_customer_id==customers.customer_id, 'leftanti').show()

#### crossJoin

In [None]:
orders.crossJoin(customers.filter(customers.customer_id == 1)).show()

# distinct
#### distinct will come at the end after the select

In [6]:
orders.select('order_status').distinct().show()

+---------------+
|   order_status|
+---------------+
|PENDING_PAYMENT|
|       COMPLETE|
|        ON_HOLD|
| PAYMENT_REVIEW|
|     PROCESSING|
|         CLOSED|
|SUSPECTED_FRAUD|
|        PENDING|
|       CANCELED|
+---------------+



# countDinstinct
#### countDistinct will be handy 

In [None]:
orders.select(countDistinct('order_status')).show()

# orderBy/sort
#### Note: sort is just an alias to orderBy

In [None]:
orders.orderBy(orders.order_date, orders.order_status.desc()).show()

Note: Only absolute column name notation or col() works while mentioning in descending order desc()

In [None]:
orders.orderBy('order_date',col('order_status').desc()).show()

# drop

drop() will only take just the column names and it will not take any other expressions for the column

In [None]:
order_items.join(products,order_items.order_item_product_id==products.product_id).drop('product_price','product_description','product_image').show()

In [None]:
#order_items.join(products, order_item_product_id==products.product_id).drop(products.product_price, products.product_description, products.product_image).show()
# Specifying full name of the column will not work 