In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
import pyspark

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('data_processing').getOrCreate()

In [60]:
df = spark.read.csv(r'sample.csv', inferSchema=True, header=True)

In [8]:
df

DataFrame[ratings: int, age: int, experience: double, family: int, mobile: string]

In [61]:
df.show()

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
|      4| 37|      16.5|     4|  Apple|
|      5| 27|       9.0|     1|     MI|
|      4| 27|       9.0|     0|   Oppo|
|      5| 37|      23.0|     5|   Vivo|
|      5| 37|      23.0|     5|Samsung|
|      3| 22|       2.5|     0|  Apple|
|      3| 27|       6.0|     0|     MI|
|      2| 27|       6.0|     2|   Oppo|
|      5| 27|       6.0|     2|Samsung|
|      3| 37|      16.5|     5|  Apple|
|      5| 27|       6.0|     0|     MI|
|      4| 22|       6.0|     1|   Oppo|
|      4| 37|       9.0|     2|Samsung|
|      4| 27|       6.0|     1|  Apple|
|      1| 37|      23.0|     5|     MI|
|      2| 42|      23.0|     2|   Oppo|
|      4| 37|       6.0|     0|   Vivo|
+-------+---+----------+------+-------+
only showing top 20 rows



In [10]:
df.count(),len(df.columns)

(33, 5)

## Print schema

In [34]:
df.printSchema()

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: integer (nullable = true)
 |-- mobile: string (nullable = true)



## Select columns

In [38]:
df.select('age','mobile').show(5)

+---+-------+
|age| mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
| 37|  Apple|
| 27|     MI|
+---+-------+
only showing top 5 rows



## Describe

In [42]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|  null|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|1.8448330794164254|  null|
|    min|                 1|                22|               2.5|                 0| Apple|
|    max|                 5|                42|              23.0|                 5|  Vivo|
+-------+------------------+------------------+------------------+------------------+------+



## Adding a new column

In [63]:
df = df.withColumn('newly_added', (df['age']+10))

In [65]:
df.show()

+-------+---+----------+------+-------+-----------+
|ratings|age|experience|family| mobile|newly_added|
+-------+---+----------+------+-------+-----------+
|      3| 32|       9.0|     3|   Vivo|         42|
|      3| 27|      13.0|     3|  Apple|         37|
|      4| 22|       2.5|     0|Samsung|         32|
|      4| 37|      16.5|     4|  Apple|         47|
|      5| 27|       9.0|     1|     MI|         37|
|      4| 27|       9.0|     0|   Oppo|         37|
|      5| 37|      23.0|     5|   Vivo|         47|
|      5| 37|      23.0|     5|Samsung|         47|
|      3| 22|       2.5|     0|  Apple|         32|
|      3| 27|       6.0|     0|     MI|         37|
|      2| 27|       6.0|     2|   Oppo|         37|
|      5| 27|       6.0|     2|Samsung|         37|
|      3| 37|      16.5|     5|  Apple|         47|
|      5| 27|       6.0|     0|     MI|         37|
|      4| 22|       6.0|     1|   Oppo|         32|
|      4| 37|       9.0|     2|Samsung|         47|
|      4| 27

## Filtering data

In [67]:
df.filter(df['mobile'] == 'Vivo').show()

+-------+---+----------+------+------+-----------+
|ratings|age|experience|family|mobile|newly_added|
+-------+---+----------+------+------+-----------+
|      3| 32|       9.0|     3|  Vivo|         42|
|      5| 37|      23.0|     5|  Vivo|         47|
|      4| 37|       6.0|     0|  Vivo|         47|
|      5| 37|      13.0|     1|  Vivo|         47|
|      4| 37|       6.0|     0|  Vivo|         47|
+-------+---+----------+------+------+-----------+



## Chained filters

In [81]:
df.filter(df['age'] < 28).filter(df['mobile'] == 'Apple').show()

+-------+---+----------+------+------+-----------+
|ratings|age|experience|family|mobile|newly_added|
+-------+---+----------+------+------+-----------+
|      3| 27|      13.0|     3| Apple|         37|
|      3| 22|       2.5|     0| Apple|         32|
|      4| 27|       6.0|     1| Apple|         37|
|      4| 27|       6.0|     1| Apple|         37|
+-------+---+----------+------+------+-----------+



## Converting to a pandas df

In [69]:
df = df.toPandas()

## Converting back to spark df

In [72]:
df = spark.createDataFrame(df)

## Distinct value

In [82]:
df.select('mobile').distinct().show()

+-------+
| mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



## Groupby

In [93]:
df.groupBy('mobile').count().orderBy('mobile',descending=False).show()

+-------+-----+
| mobile|count|
+-------+-----+
|  Apple|    7|
|     MI|    8|
|   Oppo|    7|
|Samsung|    6|
|   Vivo|    5|
+-------+-----+



In [94]:
df.groupby('mobile').mean().show()

+-------+------------------+------------------+------------------+------------------+------------------+
| mobile|      avg(ratings)|          avg(age)|   avg(experience)|       avg(family)|  avg(newly_added)|
+-------+------------------+------------------+------------------+------------------+------------------+
|     MI|               3.5|            30.125|           10.1875|             1.375|            40.125|
|   Oppo| 2.857142857142857|28.428571428571427|10.357142857142858|1.4285714285714286| 38.42857142857143|
|Samsung| 4.166666666666667|28.666666666666668| 8.666666666666666|1.8333333333333333|38.666666666666664|
|   Vivo|               4.2|              36.0|              11.4|               1.8|              46.0|
|  Apple|3.4285714285714284|30.571428571428573|              11.0|2.7142857142857144| 40.57142857142857|
+-------+------------------+------------------+------------------+------------------+------------------+



In [106]:
df.groupby('mobile').max().show()

+-------+------------+--------+---------------+-----------+----------------+
| mobile|max(ratings)|max(age)|max(experience)|max(family)|max(newly_added)|
+-------+------------+--------+---------------+-----------+----------------+
|     MI|           5|      42|           23.0|          5|              52|
|   Oppo|           4|      42|           23.0|          2|              52|
|Samsung|           5|      37|           23.0|          5|              47|
|   Vivo|           5|      37|           23.0|          5|              47|
|  Apple|           4|      37|           16.5|          5|              47|
+-------+------------+--------+---------------+-----------+----------------+



## Aggregations

In [109]:
df.groupby('mobile').agg({'experience':'max'}).show()

+-------+---------------+
| mobile|max(experience)|
+-------+---------------+
|     MI|           23.0|
|   Oppo|           23.0|
|Samsung|           23.0|
|   Vivo|           23.0|
|  Apple|           16.5|
+-------+---------------+



In [114]:
df.filter(df['mobile'] == 'MI').groupby('mobile').max().select('mobile','max(experience)').show()

+------+---------------+
|mobile|max(experience)|
+------+---------------+
|    MI|           23.0|
+------+---------------+



## User Defined Functions

In [115]:
from pyspark.sql.functions import udf

In [116]:
def price_(brand):
    if brand in ['Samsung', 'Apple']:
        return 'High price'
    elif brand == 'MI':
        return 'Mid Price'
    else:
        return 'Low price'

In [118]:
price_('Samsung')

'High price'

In [119]:
brand_udf = udf(price_,StringType())

In [131]:
df.withColumn('price range',brand_udf(df['mobile'])).show(5)

+-------+---+----------+------+-------+-----------+-----------+
|ratings|age|experience|family| mobile|newly_added|price range|
+-------+---+----------+------+-------+-----------+-----------+
|      3| 32|       9.0|     3|   Vivo|         42|  Low price|
|      3| 27|      13.0|     3|  Apple|         37| High price|
|      4| 22|       2.5|     0|Samsung|         32| High price|
|      4| 37|      16.5|     4|  Apple|         47| High price|
|      5| 27|       9.0|     1|     MI|         37|  Mid Price|
+-------+---+----------+------+-------+-----------+-----------+
only showing top 5 rows



In [136]:
# Functions with two arguments (2 columns)

In [133]:
def func(age,exp):
    return age+exp

In [134]:
f = udf(func,DoubleType())

In [135]:
df.withColumn('add', f(df['age'],df['experience'])).show(5)

+-------+---+----------+------+-------+-----------+----+
|ratings|age|experience|family| mobile|newly_added| add|
+-------+---+----------+------+-------+-----------+----+
|      3| 32|       9.0|     3|   Vivo|         42|41.0|
|      3| 27|      13.0|     3|  Apple|         37|40.0|
|      4| 22|       2.5|     0|Samsung|         32|24.5|
|      4| 37|      16.5|     4|  Apple|         47|53.5|
|      5| 27|       9.0|     1|     MI|         37|36.0|
+-------+---+----------+------+-------+-----------+----+
only showing top 5 rows



## Drop duplicates

In [137]:
df = df.drop_duplicates()

In [139]:
df.count()

26

In [140]:
help(df.drop_duplicates)

Help on method dropDuplicates in module pyspark.sql.dataframe:

dropDuplicates(subset=None) method of pyspark.sql.dataframe.DataFrame instance
    :func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
    
    .. versionadded:: 1.4



## Delete columns

In [141]:
df.drop('mobile')

DataFrame[ratings: bigint, age: bigint, experience: double, family: bigint, newly_added: bigint]

In [142]:
df.show(5)

+-------+---+----------+------+------+-----------+
|ratings|age|experience|family|mobile|newly_added|
+-------+---+----------+------+------+-----------+
|      1| 37|      23.0|     5|    MI|         47|
|      4| 27|       9.0|     0|  Oppo|         37|
|      3| 32|       9.0|     3|  Vivo|         42|
|      2| 42|      23.0|     2|  Oppo|         52|
|      5| 27|       9.0|     1|    MI|         37|
+-------+---+----------+------+------+-----------+
only showing top 5 rows



## Writing df

In [143]:
pwd

'C:\\Users\\surie\\Books to notebooks\\ML with pyspark'

In [144]:
write_path = r'C:\\Users\\surie\\Books to notebooks\\ML with pyspark\\df_csv'

In [148]:
df.write.format('csv').option('header','true').save(write_path)