In [0]:
## sparksession for df
## spark conf for rdd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Spark df")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.appName('Spark df').getOrCreate()

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, lit

In [0]:
schema = StructType([
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('name', StringType(), True),
    StructField('course', StringType(), True),
    StructField('roll', StringType(), True),
    StructField('marks', IntegerType(), True),
    StructField('email', StringType(), True),
])

df = spark.read.options(header='True').schema(schema).csv('/FileStore/tables/StudentData.csv')
df.show(5)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 5 rows



In [0]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



In [0]:
## Create df from RDDs


+---+
|age|
+---+
| 28|
| 29|
| 28|
| 29|
| 28|
+---+
only showing top 5 rows



In [0]:
# df.select('age').show(5)
# df.select(df.name, df.age).show(5)

In [0]:

# df.select(col('name')).show(5)

In [0]:
# df.select('*').show(5) 

In [0]:
# df.select(df.columns[2:4]).show(5)

In [0]:
## withColumn is used to manipulate data of a column
df = df.withColumn('roll', col('roll').cast('String'))

In [0]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



In [0]:
df.withColumn('updated_marks', df.marks + 10).show()

+---+------+----------------+------+------+-----+--------------------+-------------+
|age|gender|            name|course|  roll|marks|               email|updated_marks|
+---+------+----------------+------+------+-----+--------------------+-------------+
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|           69|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|           72|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|           55|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|           39|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|           51|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|           42|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|           79|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|           95|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_C

In [0]:
df.withColumn('Country', lit('USA')).show(5)

+---+------+----------------+------+-----+-----+--------------------+-------+
|age|gender|            name|course| roll|marks|               email|Country|
+---+------+----------------+------+-----+-----+--------------------+-------+
| 28|Female| Hubert Oliveras|    DB|02984|   59|Annika Hoffman_Na...|    USA|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|    USA|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|    USA|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|    USA|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|    USA|
+---+------+----------------+------+-----+-----+--------------------+-------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import udf, regexp_replace


In [0]:
# Define the lambda function to extract only letters from the name
extract_letters = udf(lambda x: regexp_replace(x, '[^a-zA-Z ]', ''))


In [0]:
# Apply the lambda function to the name column and create a new column "formatted_names"
df.withColumn("formatted_names", extract_letters(df.name)).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPythonException[0m                           Traceback (most recent call last)
[0;32m<command-3392408804453874>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;31m# Apply the lambda function to the name column and create a new column "formatted_names"[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"formatted_names"[0m[0;34m,[0m [0mextract_letters[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mname[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32m

In [0]:
df1 = df.withColumnRenamed('gender', 'sex').show()

+---+------+----------------+------+------+-----+--------------------+
|age|   sex|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
 df.filter(df.course == 'DB').show()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|Female|  Hubert Oliveras|    DB|  02984|   59|Annika Hoffman_Na...|
| 29|Female|     Elenore Choy|    DB|  32877|   29|Billi Clore_Mitzi...|
| 29|  Male|  Ernest Rossbach|    DB| 111449|   53|Maybell Duguay_Ab...|
| 28|Female|   Latia Vanhoose|    DB| 122502|   27|Latia Vanhoose_Mi...|
| 29|Female|   Latia Vanhoose|    DB| 152159|   27|Claude Panos_Sant...|
| 28|Female| Mickey Cortright|    DB| 192537|   62|Ernest Rossbach_M...|
| 28|Female|      Anna Santos|    DB| 311589|   79|Celeste Lollis_Mi...|
| 28|  Male|    Kizzy Brenner|    DB| 381712|   36|Paris Hutton_Kena...|
| 28|  Male| Toshiko Hillyard|    DB| 392218|   47|Leontine Phillips...|
| 29|  Male|     Paris Hutton|    DB| 481229|   57|Clementina Menke_...|
| 28|Female| Mickey Cortright|    DB| 551389|   43|

In [0]:
df.filter(df.course.isin([]))
df.filter(df.course.startswith('D')).show()
df.filter(df.course.endswith('D')).show()
df.filter(df.course.contains('D')).show()
df.filter(df.course.like('%D')).show()

In [0]:
df.select('age', 'gender', 'course').dropDuplicates()
df.select('age', 'gender', 'course').distinct()
df.select('age', 'gender', 'course').count()

In [0]:
## SORT ORDER BY
df.sort('marks', 'age').show()
df.orderBy(df.marks.asc(), df.age.desc()).show()

In [0]:
## groupby
df.groupBy('gender').sum('marks').show()
df.groupBy('gender', 'course').sum('marks').show() 

In [0]:
from pyspark.sql.functions import sum, avg, max, min, mean, count
df.groupBy('course').agg(count('*').alias('total_count'), sum('marks'), max('marks'), avg('marks')).show()
df.groupBy('course', 'gender ').agg(count('*').alias('total_count'), sum('marks'), max('marks'), avg('marks')).show()

In [0]:
df2 = df.filter(df.gender == 'Male').groupBy('course', 'gender').agg(count('*').alias('total_enrollments'))
df2.filter(col('total_enrollments') > 85).show()

In [0]:
def get_incr(state, salary, bonus):
  sum = 0
  if state == "NY":
    sum = salary * 0.10
    sum += bonus * 0.05
  elif state == "CA":
    sum = salary * 0.12
    sum += bonus * 0.03
  return sum

incrUDF = udf(lambda x,y,z: get_incr(x,y,z), DoubleType())

df.withColumn("increment", incrUDF(df.state, df.salary, df.bonus)).show()

In [0]:
#cache and persist 