# Pyspark Basics

In [2]:
#create pyspark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('pyspark').getOrCreate()

In [3]:
#read the data file
df=spark.read.csv('conversion_data.csv')

In [4]:
df.show(5)

+-------+---+--------+------+-------------------+---------+
|    _c0|_c1|     _c2|   _c3|                _c4|      _c5|
+-------+---+--------+------+-------------------+---------+
|country|age|new_user|source|total_pages_visited|converted|
|     UK| 25|       1|   Ads|                  1|        0|
|     US| 23|       1|   Seo|                  5|        0|
|     US| 28|       1|   Seo|                  4|        0|
|  China| 39|       1|   Seo|                  5|        0|
+-------+---+--------+------+-------------------+---------+
only showing top 5 rows



In [5]:
#read the data file
df=spark.read.csv('conversion_data.csv',header=True)

In [6]:
df.show(5)

+-------+---+--------+------+-------------------+---------+
|country|age|new_user|source|total_pages_visited|converted|
+-------+---+--------+------+-------------------+---------+
|     UK| 25|       1|   Ads|                  1|        0|
|     US| 23|       1|   Seo|                  5|        0|
|     US| 28|       1|   Seo|                  4|        0|
|  China| 39|       1|   Seo|                  5|        0|
|     US| 30|       1|   Seo|                  6|        0|
+-------+---+--------+------+-------------------+---------+
only showing top 5 rows



In [7]:
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- age: string (nullable = true)
 |-- new_user: string (nullable = true)
 |-- source: string (nullable = true)
 |-- total_pages_visited: string (nullable = true)
 |-- converted: string (nullable = true)



In [10]:
#statistical summary for data numerical columns
df.describe().show()

+-------+-------+------------------+-------------------+------+-------------------+-------------------+
|summary|country|               age|           new_user|source|total_pages_visited|          converted|
+-------+-------+------------------+-------------------+------+-------------------+-------------------+
|  count| 316200|            316200|             316200|316200|             316200|             316200|
|   mean|   null|30.569857685009488| 0.6854648956356736|  null|  4.872966476913346|0.03225806451612903|
| stddev|   null| 8.271801801807728|0.46433119036384723|  null|  3.341103757948214|0.17668497535763514|
|    min|  China|               111|                  0|   Ads|                  1|                  0|
|    max|     US|                79|                  1|   Seo|                  9|                  1|
+-------+-------+------------------+-------------------+------+-------------------+-------------------+



In [11]:
#acess dataframe column , we get column object 
df['country']

Column<b'country'>

In [12]:
type(df['country'])

pyspark.sql.column.Column

In [16]:
#access content of colum
df.select('country').show(5)

+-------+
|country|
+-------+
|     UK|
|     US|
|     US|
|  China|
|     US|
+-------+
only showing top 5 rows



In [17]:
#acess multiple columns
df.select(['country','source']).show(5)

+-------+------+
|country|source|
+-------+------+
|     UK|   Ads|
|     US|   Seo|
|     US|   Seo|
|  China|   Seo|
|     US|   Seo|
+-------+------+
only showing top 5 rows



### Add or Remove column 

#### using udf (user defined functions)

In [21]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def country_udf(country):
    if country =='UK':
        return 'Britain'
    elif country =='US':
        return 'USA'
    elif country =='China':
        return 'Asia'
    elif country =='Germany':
        return 'Deustche'
    else:
        return country
        
spark_udf = udf(country_udf, StringType())

df=df.withColumn("country_new", spark_udf(df.country))

In [22]:
df.show(10)

+-------+---+--------+------+-------------------+---------+-----------+
|country|age|new_user|source|total_pages_visited|converted|country_new|
+-------+---+--------+------+-------------------+---------+-----------+
|     UK| 25|       1|   Ads|                  1|        0|    Britain|
|     US| 23|       1|   Seo|                  5|        0|        USA|
|     US| 28|       1|   Seo|                  4|        0|        USA|
|  China| 39|       1|   Seo|                  5|        0|       Asia|
|     US| 30|       1|   Seo|                  6|        0|        USA|
|     US| 31|       0|   Seo|                  1|        0|        USA|
|  China| 27|       1|   Seo|                  4|        0|       Asia|
|     US| 23|       0|   Ads|                  4|        0|        USA|
|     UK| 29|       0|Direct|                  4|        0|    Britain|
|     US| 25|       0|   Ads|                  2|        0|        USA|
+-------+---+--------+------+-------------------+---------+-----

#### without using udf 

In [26]:
#create new column with age +2  value
df=df.withColumn('new_age',df['age'] +2)

In [27]:
df.show(10)

+-------+---+--------+------+-------------------+---------+-----------+-------+
|country|age|new_user|source|total_pages_visited|converted|country_new|new_age|
+-------+---+--------+------+-------------------+---------+-----------+-------+
|     UK| 25|       1|   Ads|                  1|        0|    Britain|   27.0|
|     US| 23|       1|   Seo|                  5|        0|        USA|   25.0|
|     US| 28|       1|   Seo|                  4|        0|        USA|   30.0|
|  China| 39|       1|   Seo|                  5|        0|       Asia|   41.0|
|     US| 30|       1|   Seo|                  6|        0|        USA|   32.0|
|     US| 31|       0|   Seo|                  1|        0|        USA|   33.0|
|  China| 27|       1|   Seo|                  4|        0|       Asia|   29.0|
|     US| 23|       0|   Ads|                  4|        0|        USA|   25.0|
|     UK| 29|       0|Direct|                  4|        0|    Britain|   31.0|
|     US| 25|       0|   Ads|           

### Drop /Delete columns 

In [28]:
#delete the new_age column
df=df.drop('new_age')

In [30]:
#delete the country_new column
df=df.drop('country_new')

In [31]:
df.show(5)

+-------+---+--------+------+-------------------+---------+
|country|age|new_user|source|total_pages_visited|converted|
+-------+---+--------+------+-------------------+---------+
|     UK| 25|       1|   Ads|                  1|        0|
|     US| 23|       1|   Seo|                  5|        0|
|     US| 28|       1|   Seo|                  4|        0|
|  China| 39|       1|   Seo|                  5|        0|
|     US| 30|       1|   Seo|                  6|        0|
+-------+---+--------+------+-------------------+---------+
only showing top 5 rows



# Acess row objects of dataframe

In [32]:
#access first 3 rows
df.head(3)

[Row(country='UK', age='25', new_user='1', source='Ads', total_pages_visited='1', converted='0'),
 Row(country='US', age='23', new_user='1', source='Seo', total_pages_visited='5', converted='0'),
 Row(country='US', age='28', new_user='1', source='Seo', total_pages_visited='4', converted='0')]

In [33]:
#access first row object 
df.head(3)[0]

Row(country='UK', age='25', new_user='1', source='Ads', total_pages_visited='1', converted='0')

In [34]:
#access first row object
df.head(3)[0][0]

'UK'

## Filtering 

In [42]:
#filter records where age of user is more than 75 years
df.filter(df['age'] >75).show(5)

+-------+---+--------+------+-------------------+---------+
|country|age|new_user|source|total_pages_visited|converted|
+-------+---+--------+------+-------------------+---------+
|Germany|123|       0|   Seo|                 15|        1|
|     US| 77|       0|Direct|                  4|        0|
|     US| 79|       1|Direct|                  1|        0|
|     UK|111|       0|   Ads|                 10|        1|
+-------+---+--------+------+-------------------+---------+



In [43]:
#filter records and show only country and converted status of that user
df.filter(df['age'] > 75).select(['country','converted','age']).show(5)

+-------+---------+---+
|country|converted|age|
+-------+---------+---+
|Germany|        1|123|
|     US|        0| 77|
|     US|        0| 79|
|     UK|        1|111|
+-------+---------+---+



### Multiple filter conditions

In [44]:
#select people over 75 years only from US
df.filter(df['age'] > 75).filter(df['country'] =='US').show(5)

+-------+---+--------+------+-------------------+---------+
|country|age|new_user|source|total_pages_visited|converted|
+-------+---+--------+------+-------------------+---------+
|     US| 77|       0|Direct|                  4|        0|
|     US| 79|       1|Direct|                  1|        0|
+-------+---+--------+------+-------------------+---------+



In [47]:
#selet users who have more less than 3 visited pages and are still converted from Germany 
df.filter(df['total_pages_visited'] < 3).filter(df['converted']==1).filter(df['country'] =='Germany').show(5)

+-------+---+--------+------+-------------------+---------+
|country|age|new_user|source|total_pages_visited|converted|
+-------+---+--------+------+-------------------+---------+
|Germany| 31|       0|Direct|                  2|        1|
+-------+---+--------+------+-------------------+---------+



## Count Records 

In [48]:
#total records in df 
df.count()

316200

In [50]:
# Frequency count of column values
df.groupBy('country').count().show(5)

+-------+------+
|country| count|
+-------+------+
|Germany| 13056|
|  China| 76602|
|     US|178092|
|     UK| 48450|
+-------+------+



In [53]:
## Ordered Frequency count 
df.groupBy('country').count().orderBy('count',ascending=False).show(5)

+-------+------+
|country| count|
+-------+------+
|     US|178092|
|  China| 76602|
|     UK| 48450|
|Germany| 13056|
+-------+------+



In [55]:
#Total converted vs non converted user counts
df.groupBy('converted').count().show(2)

+---------+------+
|converted| count|
+---------+------+
|        0|306000|
|        1| 10200|
+---------+------+

