# Cognizant Interview Recap
  - union() vs unionall( )
  - StorageLevel - class
  - rank & dense_rank 
  - Multiple conditions in when clause 
  - reduce vs  reduceByKey 
  - repartition vs coalesce 
  - Copy partion table to another partition table
  - Decide the total no. of buckets for a hive table
  
 ## Update  History
   - Created by Sophia Yue 
   - Date:  Feb 2021

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()


##  Dataframe  union() vs unionall()
 - union() – merge two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.
 - unionAll() is deprecated since Spark “2.0.0” version and replaced with union().
 - SQL languages, Union eliminates the duplicates but UnionAll merges two datasets including duplicate records. 
 - in PySpark both behave the same and recommend using DataFrame duplicate() function to remove duplicate rows.

## StorageLevel
    - class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)     
    - RDD storage levels:
       -DISK_ONLY = StorageLevel(True, False, False, False, 1)       -
       -DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)       -
       -MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)       -
       -MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)       -
       -MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)       -
       -MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)      -
       -MEMORY_ONLY = StorageLevel(False, True, False, False, 1)       -
       -MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)       -
       -MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)       -
       -MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)       -
       -OFF_HEAP = StorageLevel(True, True, True, False, 1)    
   

## rank & dense_rank
    - dense_rnak 'salary' and get employee woth top 3 rank  of salary 

In [2]:


simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )


In [3]:
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
#windowSpec  = Window.partitionBy("department").orderBy("salary")
winSpec  = Window.orderBy("salary")


In [5]:
"""rank"""
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(winSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Kumar| Marketing|  2000|   1|
|        James|     Sales|  3000|   2|
|        Maria|   Finance|  3000|   2|
|        James|     Sales|  3000|   2|
|         Jeff| Marketing|  3000|   2|
|        Scott|   Finance|  3300|   6|
|          Jen|   Finance|  3900|   7|
|       Robert|     Sales|  4100|   8|
|         Saif|     Sales|  4100|   8|
|      Michael|     Sales|  4600|  10|
+-------------+----------+------+----+



In [6]:
"""dense_rank"""
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(winSpec)) \
    .show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Kumar| Marketing|  2000|         1|
|        James|     Sales|  3000|         2|
|        Maria|   Finance|  3000|         2|
|        James|     Sales|  3000|         2|
|         Jeff| Marketing|  3000|         2|
|        Scott|   Finance|  3300|         3|
|          Jen|   Finance|  3900|         4|
|       Robert|     Sales|  4100|         5|
|         Saif|     Sales|  4100|         5|
|      Michael|     Sales|  4600|         6|
+-------------+----------+------+----------+



In [7]:
"""dense_rank"""
from pyspark.sql.functions import dense_rank
df.withColumn("salary_rank",dense_rank().over(winSpec)).filter('salary_rank <= 3 ') \
    .show()

+-------------+----------+------+-----------+
|employee_name|department|salary|salary_rank|
+-------------+----------+------+-----------+
|        Kumar| Marketing|  2000|          1|
|        James|     Sales|  3000|          2|
|        Maria|   Finance|  3000|          2|
|        James|     Sales|  3000|          2|
|         Jeff| Marketing|  3000|          2|
|        Scott|   Finance|  3300|          3|
+-------------+----------+------+-----------+



## Multiple conditions in when clause

In [8]:
from pyspark.sql.functions import when, col
df.withColumn("income",
       when(col("salary") < 3000, "low")
      .when(col("salary") < 4000, "medium")
      .otherwise("high")).show()

+-------------+----------+------+------+
|employee_name|department|salary|income|
+-------------+----------+------+------+
|        James|     Sales|  3000|medium|
|      Michael|     Sales|  4600|  high|
|       Robert|     Sales|  4100|  high|
|        Maria|   Finance|  3000|medium|
|        James|     Sales|  3000|medium|
|        Scott|   Finance|  3300|medium|
|          Jen|   Finance|  3900|medium|
|         Jeff| Marketing|  3000|medium|
|        Kumar| Marketing|  2000|   low|
|         Saif|     Sales|  4100|  high|
+-------------+----------+------+------+



## reduce vs  reduceByKey
 - both are action functions
 - reduce(f): Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
 - reduceByKey(func, numPartitions=None, partitionFunc=) : Merge the values for each key using an associative and commutative reduce function.


In [9]:
# Example to use reduce
def f_merge_dfs(*dfs):
    """
      module name : f_merge_dfs
      purpose     : Merge PySpark dataframe row-wise
      parameter   :
        *dfs      : Any number of PySpark dataframe, and separated by ',' 
      note        : Adapted from https://datascience.stackexchange.com/questions/11356/merging-multiple-data-frames-row-wise-in-pyspark
      example     : f_merge_dfs(td1, td2, td3, td4) # merge PySpark dataframe td1, td2, td3, and    td4 
      written by  : Sophia Yue 
    """    
    from   functools   import reduce 
    from   pyspark.sql import DataFrame
    return reduce(DataFrame.union, dfs)

In [None]:
# Example to use reduceByKey
# The code below is my exercise for spark straming  which use reduceByKey to get count for eavery word
# I skip the code "connexct to Unix via Ubuntu"  for the input 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# Create a DStream that will connect to hostname:port, like localhost:9999
# Firewalls might block this!
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

## repartition vs coalesce with PySpark
  -  repartition() is used to increase or decrease the RDD, DataFrame partitions
  -  coalesce() is used to only decrease the number of partitions in an efficient way. 
     - This is optimized or improved version of repartition()
     - coalesce uses existing partitions to minimize the amount of data that's shuffled.  

## Copy partion table to another partition table
    - the example below is  from 
      - https://stackoverflow.com/questions/24211372/loading-data-from-one-hive-table-to-another-with-partition

In [None]:
CREATE EXTERNAL TABLE IF NOT EXISTS reg_logs (
id int,
region_code int,
count int
)
PARTITIONED BY (utc_date STRING, utc_hour STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/ad_data/raw/reg_logs';

CREATE EXTERNAL TABLE IF NOT EXISTS reg_logs_org (
id int,
region_code int,
count int
)
PARTITIONED BY (utc_date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/ad_data/reg_logs_org';

insert overwrite table reg_logs_org PARTITION (utc_date)
select id, region_code, sum(count), utc_date
from 
reg_logs
group by 
utc_date, id, region_code

## Decide the total no. of buckets for a hive table
    - I read the article below. However, I don't get a clear idea. 
      - https://stackoverflow.com/questions/30730567/how-can-we-decide-the-total-no-of-buckets-for-a-hive-table 