First a spark session needs to be created for usage.

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Dataframe').getOrCreate()
spark

23/07/28 11:07:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


PySpark can read from csv files. The tables will be stored as a PySpark dataframe. 

Use `show()` will demonstrate the content of the dataframe. 

`inferSchema` will infer the type of the column data.

In [11]:
df_pyspark = spark.read.csv('test1.csv', header=True, inferSchema=True)
print(type(df_pyspark))
df_pyspark.show()
df_pyspark.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



To get a column from the dataframe, one can either use `key` or `select`. But `select` will enable users to see the content of the column.

In [6]:
df_pyspark.select(['Name','Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [7]:
df_pyspark['Name','Experience']

DataFrame[Name: string, Experience: int]

Get the summary of the dataframe.

In [8]:
df_pyspark.describe().show()

23/07/27 15:18:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:>                                                          (0 + 1) / 1]

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



                                                                                

Add or drop the column in the dataframe.

In [9]:
df_pyspark = df_pyspark.withColumn('Experience After 2 year', df_pyspark['Experience'] + 2)
df_pyspark.show()
df_pyspark = df_pyspark.drop('Experience After 2 year')
df_pyspark.show()
df_pyspark.withColumnRenamed('Name','New Name').show()

+---------+---+----------+------+-----------------------+
|     Name|age|Experience|Salary|Experience After 2 year|
+---------+---+----------+------+-----------------------+
|    Krish| 31|        10| 30000|                     12|
|Sudhanshu| 30|         8| 25000|                     10|
|    Sunny| 29|         4| 20000|                      6|
|     Paul| 24|         3| 20000|                      5|
|   Harsha| 21|         1| 15000|                      3|
|  Shubham| 23|         2| 18000|                      4|
+---------+---+----------+------+-----------------------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------

User can also customize filters for the dataframe.

In [12]:
df_pyspark.filter((df_pyspark['Salary'] >= 25000) | 
                  ~(df_pyspark['Salary'] > 18000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



PySpark dataframe also supports groupby operations.

For small datasets, Spark can be even slower than Pandas.

In [24]:
%%time
import pandas as pd
df = pd.read_csv('appliances.csv')

CPU times: user 6.74 s, sys: 1.05 s, total: 7.79 s
Wall time: 12.2 s


In [30]:
%%time
result = df.groupby(['brandId'])['rank'].mean()

CPU times: user 3.54 ms, sys: 1.65 ms, total: 5.19 ms
Wall time: 3.8 ms


In [9]:
%%time
df_spark = spark.read.csv('appliances.csv', header=True, inferSchema=True)



CPU times: user 14.9 ms, sys: 5.99 ms, total: 20.9 ms
Wall time: 9.71 s


                                                                                

In [27]:
%%time
result = df_spark.groupBy('brandId').agg({'rank': 'mean'})

CPU times: user 4.89 ms, sys: 3 ms, total: 7.89 ms
Wall time: 244 ms


Use sql statements in Spark Session, seems to be faster than dataframe operations.

In [23]:
df_spark.createOrReplaceTempView('appliances')

In [24]:
%%time
result = spark.sql("SELECT AVG(rank) FROM appliances GROUP BY brandId")

CPU times: user 2.77 ms, sys: 2.23 ms, total: 5 ms
Wall time: 118 ms


One can check the number of partitions directly.

The output folder will store all the partitions of data. Remember to remove output folder each time before rerunning the code.

In [19]:
df_spark.rdd.getNumPartitions()

5

In [61]:
q = spark.sql("SELECT SUM(rank) FROM appliances GROUP BY imageUrl")
print(q.rdd.getNumPartitions())
print(q.rdd.repartition(10).getNumPartitions())
# q.rdd.map(lambda x: str(x[0]) + '\t' + str(x[1])).saveAsTextFile('output')

23/07/28 13:58:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/07/28 13:58:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/07/28 13:58:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

5
10


23/07/30 11:04:45 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 27796431 ms exceeds timeout 120000 ms
23/07/30 11:04:45 WARN SparkContext: Killing executors is not supported by current scheduler.
23/07/30 11:04:51 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage

`sqlcontext` has already been deprecated.


In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc =SparkContext()
sqlContext = SQLContext(sc)

tips = sqlContext.read.csv("tips.csv")
tips.registerTempTable("tips")

23/07/28 11:04:44 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.2.162 instead (on interface en0)
23/07/28 11:04:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/28 11:04:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
type(tips)

pyspark.sql.dataframe.DataFrame

In [5]:
q11 = sqlContext.sql('SELECT * FROM tips')
q11.rdd.map(lambda x: str(x[1]) + '\t' + str(x[0])).saveAsTextFile('output')

                                                                                