In [1]:
import pyspark
from pyspark.sql import SparkSession
import traceback

# Creating a Spark session

In [2]:
spark = SparkSession.builder.appName('practice').getOrCreate()
spark

24/03/16 22:30:22 WARN Utils: Your hostname, Ds-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.89 instead (on interface en0)
24/03/16 22:30:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/16 22:30:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Reading a csv file

In [3]:
df = spark.read.csv('test1.csv', header= True, inferSchema=True) 
#inferSchema option tells the reader to infer data types from the source file.

df.show()
df.printSchema()

+---+-----+---+----------+
| id|names|age|experience|
+---+-----+---+----------+
|  0|  sai| 12|         1|
|  1|pawan| 24|         2|
|  2|    d| 36|         3|
+---+-----+---+----------+

root
 |-- id: integer (nullable = true)
 |-- names: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [4]:
df_name_exp = df.select(['names', 'experience'])
df_name_exp.show()

print(type(df_name_exp))
df.dtypes
df.describe().show()

+-----+----------+
|names|experience|
+-----+----------+
|  sai|         1|
|pawan|         2|
|    d|         3|
+-----+----------+

<class 'pyspark.sql.dataframe.DataFrame'>


24/03/16 22:30:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+---+-----+----+----------+
|summary| id|names| age|experience|
+-------+---+-----+----+----------+
|  count|  3|    3|   3|         3|
|   mean|1.0| NULL|24.0|       2.0|
| stddev|1.0| NULL|12.0|       1.0|
|    min|  0|    d|  12|         1|
|    max|  2|  sai|  36|         3|
+-------+---+-----+----+----------+



# Columns in DataFrame

In [5]:
# Picking up a singel column will change the datatype to column not dataframe.
names = df_name_exp['names']
print(names, type(names))

try:
    # Show only works on the dataframe not on columns
    names.show()
except TypeError as e:
    print(traceback.print_exception(e))



Column<'names'> <class 'pyspark.sql.column.Column'>
None


Traceback (most recent call last):
  File "/var/folders/71/c7pdh8bj3p1chk4tv474y1g80000gn/T/ipykernel_14789/3921233689.py", line 7, in <module>
    names.show()
TypeError: 'Column' object is not callable


### Adding the columns

In [6]:
df_new_clm = df.withColumn('Experience after 2 years', df['experience'] + 2)
df_new_clm.show()

+---+-----+---+----------+------------------------+
| id|names|age|experience|Experience after 2 years|
+---+-----+---+----------+------------------------+
|  0|  sai| 12|         1|                       3|
|  1|pawan| 24|         2|                       4|
|  2|    d| 36|         3|                       5|
+---+-----+---+----------+------------------------+



### Droping the Column

In [7]:
df_new_clm.drop('Experience after 2 years').show()

+---+-----+---+----------+
| id|names|age|experience|
+---+-----+---+----------+
|  0|  sai| 12|         1|
|  1|pawan| 24|         2|
|  2|    d| 36|         3|
+---+-----+---+----------+



### Renaming the Column

In [8]:
df.withColumnRenamed('names', 'name').show()

+---+-----+---+----------+
| id| name|age|experience|
+---+-----+---+----------+
|  0|  sai| 12|         1|
|  1|pawan| 24|         2|
|  2|    d| 36|         3|
+---+-----+---+----------+



### Adding new rows using Union

In [9]:
from pyspark.sql import Row

new_rows = spark.createDataFrame(
    [
        (3, 'saipawan', 34, 4),
        (4, 'pawand', None, 5),
        (5, 'said', None, None),
        (None, None, None, None)
    ],
     df.columns
    )
new_rows.show()

df = df.union(new_rows)

df.show()


+----+--------+----+----------+
|  id|   names| age|experience|
+----+--------+----+----------+
|   3|saipawan|  34|         4|
|   4|  pawand|NULL|         5|
|   5|    said|NULL|      NULL|
|NULL|    NULL|NULL|      NULL|
+----+--------+----+----------+

+----+--------+----+----------+
|  id|   names| age|experience|
+----+--------+----+----------+
|   0|     sai|  12|         1|
|   1|   pawan|  24|         2|
|   2|       d|  36|         3|
|   3|saipawan|  34|         4|
|   4|  pawand|NULL|         5|
|   5|    said|NULL|      NULL|
|NULL|    NULL|NULL|      NULL|
+----+--------+----+----------+



In [10]:
data1 = [[1, 'sai', 2000], [2, 'pawan', 3000]]
schema1 = ['_id', 'name', 'salary']

data2 = [[3, 'dsai', 4000], [4, 'saipawan', 5000], [2, 'pawan', 3000]]
schema2 = ['_id', 'name', 'salary']

df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)

# Unlike sql, the union function does not remove the duplicate rows
df1.union(df2).show()
df1.unionAll(df2).show()

+---+--------+------+
|_id|    name|salary|
+---+--------+------+
|  1|     sai|  2000|
|  2|   pawan|  3000|
|  3|    dsai|  4000|
|  4|saipawan|  5000|
|  2|   pawan|  3000|
+---+--------+------+

+---+--------+------+
|_id|    name|salary|
+---+--------+------+
|  1|     sai|  2000|
|  2|   pawan|  3000|
|  3|    dsai|  4000|
|  4|saipawan|  5000|
|  2|   pawan|  3000|
+---+--------+------+



### Union by name

In [11]:
data1 = [[1, 'sai', 2000], [2, 'pawan', 3000]]
schema1 = ['_id', 'name', 'salary']

data2 = [[3, 'dsai', 'male'], [4, 'saipawan', 'male'], [5, 'xyz', 'female']]
schema2 = ['_id', 'name', 'gender']

df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)

try:
    # This will throw an error as there is an mismatch in the col name.
    df1.unionByName(df2)
except Exception as e:
    print(e)


# This will merge the dataFrames using the column name.
df1.unionByName(df2, allowMissingColumns= True).show()

Cannot resolve column name "salary" among (_id, name, gender).
+---+--------+------+------+
|_id|    name|salary|gender|
+---+--------+------+------+
|  1|     sai|  2000|  NULL|
|  2|   pawan|  3000|  NULL|
|  3|    dsai|  NULL|  male|
|  4|saipawan|  NULL|  male|
|  5|     xyz|  NULL|female|
+---+--------+------+------+



# Distinct and DropDuplicates

In [12]:
data = [[1, 'sai', 'male', 2000],[2, 'pawan', 'male', 3000], [2, 'pawan', 'male', 3000], [3, 'd', 'female', 4000]]
schema = ['_id', 'name', 'gender', 'salary']

df = spark.createDataFrame(data, schema)
df.show()

+---+-----+------+------+
|_id| name|gender|salary|
+---+-----+------+------+
|  1|  sai|  male|  2000|
|  2|pawan|  male|  3000|
|  2|pawan|  male|  3000|
|  3|    d|female|  4000|
+---+-----+------+------+



In [13]:
df.distinct().show()

df.dropDuplicates().show()
# now it will only show the 1st occurance
df.dropDuplicates(['gender']).show()
df.dropDuplicates(['gender', 'salary']).show()


+---+-----+------+------+
|_id| name|gender|salary|
+---+-----+------+------+
|  1|  sai|  male|  2000|
|  2|pawan|  male|  3000|
|  3|    d|female|  4000|
+---+-----+------+------+

+---+-----+------+------+
|_id| name|gender|salary|
+---+-----+------+------+
|  1|  sai|  male|  2000|
|  2|pawan|  male|  3000|
|  3|    d|female|  4000|
+---+-----+------+------+

+---+----+------+------+
|_id|name|gender|salary|
+---+----+------+------+
|  3|   d|female|  4000|
|  1| sai|  male|  2000|
+---+----+------+------+

+---+-----+------+------+
|_id| name|gender|salary|
+---+-----+------+------+
|  3|    d|female|  4000|
|  1|  sai|  male|  2000|
|  2|pawan|  male|  3000|
+---+-----+------+------+



# Manipulating NULL values

In [14]:
df = spark.createDataFrame(
    [
        (0, 'sai', 12, 1),
        (1, 'pawan', 24, 2),
        (2, 'd', 36, 3),
        (3, 'saipawan', 34, 4),
        (4, 'pawand', None, 5),
        (5, 'said', None, None),
        (None, None, None, None)
    ],
     ['id', 'name','age', 'experience']
    )
df.show()
df.printSchema()

+----+--------+----+----------+
|  id|    name| age|experience|
+----+--------+----+----------+
|   0|     sai|  12|         1|
|   1|   pawan|  24|         2|
|   2|       d|  36|         3|
|   3|saipawan|  34|         4|
|   4|  pawand|NULL|         5|
|   5|    said|NULL|      NULL|
|NULL|    NULL|NULL|      NULL|
+----+--------+----+----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- experience: long (nullable = true)



### Droping Null Values

In [15]:
df.na.drop(how='any').show()  # this is the default values
df.na.drop(how='all').show()

+---+--------+---+----------+
| id|    name|age|experience|
+---+--------+---+----------+
|  0|     sai| 12|         1|
|  1|   pawan| 24|         2|
|  2|       d| 36|         3|
|  3|saipawan| 34|         4|
+---+--------+---+----------+

+---+--------+----+----------+
| id|    name| age|experience|
+---+--------+----+----------+
|  0|     sai|  12|         1|
|  1|   pawan|  24|         2|
|  2|       d|  36|         3|
|  3|saipawan|  34|         4|
|  4|  pawand|NULL|         5|
|  5|    said|NULL|      NULL|
+---+--------+----+----------+



In [16]:
df.na.drop(how='any',thresh=2).show()
# This will delete all the rows with dataset which have more than <thresh> non null values.

+---+--------+----+----------+
| id|    name| age|experience|
+---+--------+----+----------+
|  0|     sai|  12|         1|
|  1|   pawan|  24|         2|
|  2|       d|  36|         3|
|  3|saipawan|  34|         4|
|  4|  pawand|NULL|         5|
|  5|    said|NULL|      NULL|
+---+--------+----+----------+



In [17]:
df.na.drop(how='any', subset=['experience']).show()
# Specify which columns you want to focus on.

+---+--------+----+----------+
| id|    name| age|experience|
+---+--------+----+----------+
|  0|     sai|  12|         1|
|  1|   pawan|  24|         2|
|  2|       d|  36|         3|
|  3|saipawan|  34|         4|
|  4|  pawand|NULL|         5|
+---+--------+----+----------+



### Filling the missing values

In [18]:
# fill and fillNa will work the same way.
df.fillna('Missing Values', subset=['name']).show()

df.na.fill( 0 , subset=['age', 'experience']).show()

+----+--------------+----+----------+
|  id|          name| age|experience|
+----+--------------+----+----------+
|   0|           sai|  12|         1|
|   1|         pawan|  24|         2|
|   2|             d|  36|         3|
|   3|      saipawan|  34|         4|
|   4|        pawand|NULL|         5|
|   5|          said|NULL|      NULL|
|NULL|Missing Values|NULL|      NULL|
+----+--------------+----+----------+

+----+--------+---+----------+
|  id|    name|age|experience|
+----+--------+---+----------+
|   0|     sai| 12|         1|
|   1|   pawan| 24|         2|
|   2|       d| 36|         3|
|   3|saipawan| 34|         4|
|   4|  pawand|  0|         5|
|   5|    said|  0|         0|
|NULL|    NULL|  0|         0|
+----+--------+---+----------+



In [19]:
from pyspark.ml.feature import Imputer

cols = ['age', 'experience']

imputer = Imputer(
    inputCols = cols,
    outputCols = [ f'{col}_impute' for col in cols]
).setStrategy('mean')

imputer.fit(df).transform(df).show()

+----+--------+----+----------+----------+-----------------+
|  id|    name| age|experience|age_impute|experience_impute|
+----+--------+----+----------+----------+-----------------+
|   0|     sai|  12|         1|        12|                1|
|   1|   pawan|  24|         2|        24|                2|
|   2|       d|  36|         3|        36|                3|
|   3|saipawan|  34|         4|        34|                4|
|   4|  pawand|NULL|         5|        26|                5|
|   5|    said|NULL|      NULL|        26|                3|
|NULL|    NULL|NULL|      NULL|        26|                3|
+----+--------+----+----------+----------+-----------------+



# Filter/where on DataFrame

In [20]:
df.show()

# Both filter and where works the same.
df.filter(
    (df['age'] > 15) & (df['age'] < 35)
).show()
# other operations are &,|, ==, ~

df.filter(
    ~(df['age'] < 15)
).show()

+----+--------+----+----------+
|  id|    name| age|experience|
+----+--------+----+----------+
|   0|     sai|  12|         1|
|   1|   pawan|  24|         2|
|   2|       d|  36|         3|
|   3|saipawan|  34|         4|
|   4|  pawand|NULL|         5|
|   5|    said|NULL|      NULL|
|NULL|    NULL|NULL|      NULL|
+----+--------+----+----------+

+---+--------+---+----------+
| id|    name|age|experience|
+---+--------+---+----------+
|  1|   pawan| 24|         2|
|  3|saipawan| 34|         4|
+---+--------+---+----------+

+---+--------+---+----------+
| id|    name|age|experience|
+---+--------+---+----------+
|  1|   pawan| 24|         2|
|  2|       d| 36|         3|
|  3|saipawan| 34|         4|
+---+--------+---+----------+



# GroupBy and Aggrigate Functions

In [21]:
data = [['Sai', 'Maths', 10000],
['Sai', 'English', 5000],
['Pawan', 'Science', 4000],
['Sai', 'Science', 4000],
['pawansai', 'Maths', 3000],
['saipawan', 'Maths', 20000],
['saipawan', 'English', 10000],
['saipawan', 'Science', 5000],
['dsai', 'Maths', 10000],
['dsai', 'Science', 2000]]

df = spark.createDataFrame(data, ['Name', 'Department', 'Salary'],)
df.show()


+--------+----------+------+
|    Name|Department|Salary|
+--------+----------+------+
|     Sai|     Maths| 10000|
|     Sai|   English|  5000|
|   Pawan|   Science|  4000|
|     Sai|   Science|  4000|
|pawansai|     Maths|  3000|
|saipawan|     Maths| 20000|
|saipawan|   English| 10000|
|saipawan|   Science|  5000|
|    dsai|     Maths| 10000|
|    dsai|   Science|  2000|
+--------+----------+------+



In [22]:
df.groupBy('Name').sum().show()

df.groupBy('Department').avg().sort('avg(Salary)').show()

+--------+-----------+
|    Name|sum(Salary)|
+--------+-----------+
|     Sai|      19000|
|   Pawan|       4000|
|pawansai|       3000|
|saipawan|      35000|
|    dsai|      12000|
+--------+-----------+

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|   Science|     3750.0|
|   English|     7500.0|
|     Maths|    10750.0|
+----------+-----------+



In [23]:
from pyspark.sql.functions import count, min, max

# Agg function are used to appply multipe aggrigate operation on one grp by action.
# * means all the columns.
df.groupby('Department').agg(count('*').alias("Count of all employes"), min('salary'), max('salary')).show()

+----------+---------------------+-----------+-----------+
|Department|Count of all employes|min(salary)|max(salary)|
+----------+---------------------+-----------+-----------+
|     Maths|                    4|       3000|      20000|
|   English|                    2|       5000|      10000|
|   Science|                    4|       2000|       5000|
+----------+---------------------+-----------+-----------+



# Pivot Function

In [24]:
data = [[1,'Sai', 'Maths', 'male'],
[2, 'Sai', 'English', 'male'],
[3,'Pawan', 'Science', 'female'],
[4, 'Sai', 'Science', 'female'],
[5, 'pawansai', 'Maths', 'female'],
[6, 'saipawan', 'Maths', 'male'],
[7, 'saipawan', 'English', 'male'],
[8, 'saipawan', 'Science', 'male'],
[9, 'dsai', 'Maths', 'female'],
[10, 'dsai', 'Science', 'female']]

df = spark.createDataFrame(data, ['id', 'Name', 'Department', 'Gender'],)
df.show()

+---+--------+----------+------+
| id|    Name|Department|Gender|
+---+--------+----------+------+
|  1|     Sai|     Maths|  male|
|  2|     Sai|   English|  male|
|  3|   Pawan|   Science|female|
|  4|     Sai|   Science|female|
|  5|pawansai|     Maths|female|
|  6|saipawan|     Maths|  male|
|  7|saipawan|   English|  male|
|  8|saipawan|   Science|  male|
|  9|    dsai|     Maths|female|
| 10|    dsai|   Science|female|
+---+--------+----------+------+



In [25]:
df.groupBy(['Department', 'Gender']).count().show()

# Pivot the gender column to get number of males and females in one department
pivot_df = df.groupBy('Department').pivot('Gender').count()
pivot_df.show()

# If I have to dsiaply only number of males in the department.
df.groupBy('Department').pivot('Gender', ['male']).count().show()


+----------+------+-----+
|Department|Gender|count|
+----------+------+-----+
|     Maths|  male|    2|
|   English|  male|    2|
|   Science|female|    3|
|     Maths|female|    2|
|   Science|  male|    1|
+----------+------+-----+

+----------+------+----+
|Department|female|male|
+----------+------+----+
|   Science|     3|   1|
|   English|  NULL|   2|
|     Maths|     2|   2|
+----------+------+----+

+----------+----+
|Department|male|
+----------+----+
|   Science|   1|
|   English|   2|
|     Maths|   2|
+----------+----+



In [26]:
# Unpivot functions
# There is not unpivot functionality in pyspark, we have to use stack functionality to unpivot.

pivot_df.show()

# Now I want this above dataframe with columns as Department, Gneder, Count as below.
"""
|Department|Gender|count|
+----------+------+-----+
|     Maths|  male|    2|
|   English|  male|    2|
|   Science|female|    3|
|     Maths|female|    2|
|   Science|  male|    1|
"""
from pyspark.sql.functions import expr, stack

# expr fucntion takes a sting as an input but treat that as a python code.
pivot_df.select('Department', expr("stack(2, 'M', male, 'F', female) as (gender, count)")).show()
# here 2 represents number of columns values that we are unpivoting, followed by their names.

+----------+------+----+
|Department|female|male|
+----------+------+----+
|   Science|     3|   1|
|   English|  NULL|   2|
|     Maths|     2|   2|
+----------+------+----+

+----------+------+-----+
|Department|gender|count|
+----------+------+-----+
|   Science|     M|    1|
|   Science|     F|    3|
|   English|     M|    2|
|   English|     F| NULL|
|     Maths|     M|    2|
|     Maths|     F|    2|
+----------+------+-----+



# OrderBy and Sort functions

In [27]:
data = [[1, 'sai', 'male', 5000],[2, 'pawan', 'male', 2000], [2, 'saipawan', 'male', 10000]]
schema = ['_id', 'name', 'gender', 'salary']

df = spark.createDataFrame(data, schema)
df.show()

+---+--------+------+------+
|_id|    name|gender|salary|
+---+--------+------+------+
|  1|     sai|  male|  5000|
|  2|   pawan|  male|  2000|
|  2|saipawan|  male| 10000|
+---+--------+------+------+



In [28]:
df.sort(df.salary).show()
df.sort(df.salary.desc()).show()

+---+--------+------+------+
|_id|    name|gender|salary|
+---+--------+------+------+
|  2|   pawan|  male|  2000|
|  1|     sai|  male|  5000|
|  2|saipawan|  male| 10000|
+---+--------+------+------+

+---+--------+------+------+
|_id|    name|gender|salary|
+---+--------+------+------+
|  2|saipawan|  male| 10000|
|  1|     sai|  male|  5000|
|  2|   pawan|  male|  2000|
+---+--------+------+------+



In [29]:
df.orderBy(df.salary).show()
df.orderBy(df.salary.desc()).show()

+---+--------+------+------+
|_id|    name|gender|salary|
+---+--------+------+------+
|  2|   pawan|  male|  2000|
|  1|     sai|  male|  5000|
|  2|saipawan|  male| 10000|
+---+--------+------+------+

+---+--------+------+------+
|_id|    name|gender|salary|
+---+--------+------+------+
|  2|saipawan|  male| 10000|
|  1|     sai|  male|  5000|
|  2|   pawan|  male|  2000|
+---+--------+------+------+



# Sample Function

In [30]:
df = spark.range(start = 1, end = 101)
df.show(5) # Desplaying only 1st 5 elements

# This will give approx 10% of the values selected at random, sometimes it can be more than 10% and sometimes it can be less than 10%
df1 = df.sample(fraction= 0.1) 
df1.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
only showing top 5 rows

+---+
| id|
+---+
| 24|
| 27|
| 28|
| 29|
| 45|
| 46|
| 50|
| 62|
| 68|
| 83|
| 86|
| 97|
+---+



# Collect Fuction

In [31]:
data = [
    (1, 'sai', 2000),
    (2, 'pawan', 3000)
]

schema = ['Id', 'name', 'salary']

df =  spark.createDataFrame(data, schema)
df.show()

lst_of_rows = df.collect()
print("Type of list element: ",type(lst_of_rows[0]))
print()

for row in lst_of_rows:
    print(row.Id, row.name, row.salary)

'''
Note: this function is not recommended on very large data, as to perform this function we need collect all the data on one node 
and we may face out of memory error.
'''

+---+-----+------+
| Id| name|salary|
+---+-----+------+
|  1|  sai|  2000|
|  2|pawan|  3000|
+---+-----+------+

Type of list element:  <class 'pyspark.sql.types.Row'>

1 sai 2000
2 pawan 3000


'\nNote: this function is not recommended on very large data, as to perform this function we need collect all the data on one node \nand we may face out of memory error.\n'

# Transfrom Function

In [32]:
data = [
    (1, 'sai', 20),
    (2, 'pawan', 30)
]
schema = ['id', 'name', 'salary in Thousnds']
df =  spark.createDataFrame(data, schema)
df.show()

from pyspark.sql.functions import upper, col

# So we need to transform the DataFrame such that name values are in uppercase and the salary are in lakhs and not in thousands.

# Transform function will help to incorporate multiple tranforamtions under one function.
def transfrom_data(df):
    df = df.withColumn('name', upper('name'))
    return df.withColumn('salary in Lakhs', (col('salary in Thousnds') * 10000) / 100000).drop('Salary in Thousnds')

df.transform(transfrom_data).show()


+---+-----+------------------+
| id| name|salary in Thousnds|
+---+-----+------------------+
|  1|  sai|                20|
|  2|pawan|                30|
+---+-----+------------------+

+---+-----+---------------+
| id| name|salary in Lakhs|
+---+-----+---------------+
|  1|  SAI|            2.0|
|  2|PAWAN|            3.0|
+---+-----+---------------+



In [33]:
# Exploring another transfrom function.
from pyspark.sql.functions import transform, upper, col
# This transform will only work on Array Type columns.

data = [
    (1, 'sai', ['c', 'java']),
    (2, 'pawan', ['python', 'javascript'])
]
schema = ['id', 'name', 'languages']
df =  spark.createDataFrame(data, schema)
df.show()

# Using lambda function
df.withColumn('Langunages(Upper)', transform('languages', lambda language: upper(language))).show()

# Using normal function
def upper_lan(language):
    return upper(language)
df.withColumn('Langunages(Upper)', transform('languages', upper_lan)).show()


+---+-----+--------------------+
| id| name|           languages|
+---+-----+--------------------+
|  1|  sai|           [c, java]|
|  2|pawan|[python, javascript]|
+---+-----+--------------------+

+---+-----+--------------------+--------------------+
| id| name|           languages|   Langunages(Upper)|
+---+-----+--------------------+--------------------+
|  1|  sai|           [c, java]|           [C, JAVA]|
|  2|pawan|[python, javascript]|[PYTHON, JAVASCRIPT]|
+---+-----+--------------------+--------------------+

+---+-----+--------------------+--------------------+
| id| name|           languages|   Langunages(Upper)|
+---+-----+--------------------+--------------------+
|  1|  sai|           [c, java]|           [C, JAVA]|
|  2|pawan|[python, javascript]|[PYTHON, JAVASCRIPT]|
+---+-----+--------------------+--------------------+



# Creating or Replacing TempView

In [34]:
data = [
    (1, 'sai', 20),
    (2, 'pawan', 30)
]
schema = ['id', 'name', 'salary']
df =  spark.createDataFrame(data, schema)
df.show()


# this will create a temporary view on the dataFrame which can be accessed only in that session.
df.createOrReplaceTempView('employees')
df1 = spark.sql('select name, salary from employees')
df1.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  sai|    20|
|  2|pawan|    30|
+---+-----+------+

+-----+------+
| name|salary|
+-----+------+
|  sai|    20|
|pawan|    30|
+-----+------+



# Create or Replace global TempView

In [35]:
data = [
    (1, 'sai', 20),
    (2, 'pawan', 30)
]
schema = ['id', 'name', 'salary']
df =  spark.createDataFrame(data, schema)
df.show()


# this will create a view on the dataframe accross all the session.
df.createOrReplaceGlobalTempView('emp')
df1 = spark.sql('select name, salary from global_temp.emp')
df1.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  sai|    20|
|  2|pawan|    30|
+---+-----+------+

+-----+------+
| name|salary|
+-----+------+
|  sai|    20|
|pawan|    30|
+-----+------+



24/03/16 22:30:36 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [36]:
spark.catalog.listTables(spark.catalog.currentDatabase())  # default database.

[Table(name='employees', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [37]:
spark.catalog.listTables('global_temp')

[Table(name='emp', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='employees', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [38]:
# drop temp view
# spark.catalog.dropTempView("employees")

# User Defined Functions

In [39]:
from pyspark.sql.functions import udf, collect_list
from pyspark.sql.types import IntegerType, ArrayType

In [40]:
data = [
    (1, 'sai', 2000, 1000),
    (2, 'pawan', 3000, 500)
]
schema = ['Id', 'name', 'salary', 'bonus']
df =  spark.createDataFrame(data, schema)
df.show()

# Creating a Function that will add salary and bonus, and name it as total pay
def total_pay(salary:int, bonus:int)->int:
    return salary + bonus
total_payment = udf(total_pay, IntegerType())
df.withColumn('total_pay', total_payment(df.salary, df.bonus)).show()

# Using annotations
# Creating a function to return a list of with elements as salary and bonus.
@udf(ArrayType(elementType=IntegerType()))
def get_array(*args):
    return list(args)
df.withColumn('array', get_array(df.salary, df.bonus)).show()

+---+-----+------+-----+
| Id| name|salary|bonus|
+---+-----+------+-----+
|  1|  sai|  2000| 1000|
|  2|pawan|  3000|  500|
+---+-----+------+-----+

+---+-----+------+-----+---------+
| Id| name|salary|bonus|total_pay|
+---+-----+------+-----+---------+
|  1|  sai|  2000| 1000|     3000|
|  2|pawan|  3000|  500|     3500|
+---+-----+------+-----+---------+

+---+-----+------+-----+------------+
| Id| name|salary|bonus|       array|
+---+-----+------+-----+------------+
|  1|  sai|  2000| 1000|[2000, 1000]|
|  2|pawan|  3000|  500| [3000, 500]|
+---+-----+------+-----+------------+



In [41]:
# Registering the fucntion in registrey to access it along with sql command.
spark.udf.register(name = 'total_payment', f = total_payment)
spark.sql('SELECT * FROM employees').show()
spark.sql('SELECT *, total_payment(salary, 1000) as totPay FROM employees').show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  sai|    20|
|  2|pawan|    30|
+---+-----+------+

+---+-----+------+------+
| id| name|salary|totPay|
+---+-----+------+------+
|  1|  sai|    20|  1020|
|  2|pawan|    30|  1030|
+---+-----+------+------+

