First, we need start SparkSession

# Create SparkSession

In [1]:
from pyspark.sql import SparkSession

In [2]:
app_name = "BasicOperations"
spark = SparkSession.builder.appName(app_name).getOrCreate()

# List of input corpus

In [3]:
!ls "input_data"

appl_stock.csv	ContainsNull.csv  people.json  sales_info.csv


# Read data

Sample data in "people.json" is described as follows:

```
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
```

In [4]:
dir_input_data_path = "./input_data/"
file_input_path = dir_input_data_path + "people.json"
df = spark.read.json(file_input_path)

# Overview input corpus

We take a look into input data

## Review data in DataFrame

In [5]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [6]:
print(dir(df))

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_collectAsArrow', '_jcols', '_jdf', '_jmap', '_jseq', '_lazy_rdd', '_repr_html_', '_sc', '_schema', '_sort_cols', '_support_repr_html', 'agg', 'alias', 'approxQuantile', 'cache', 'checkpoint', 'coalesce', 'colRegex', 'collect', 'columns', 'corr', 'count', 'cov', 'createGlobalTempView', 'createOrReplaceGlobalTempView', 'createOrReplaceTempView', 'createTempView', 'crossJoin', 'crosstab', 'cube', 'describe', 'distinct', 'drop', 'dropDuplicates', 'drop_duplicates', 'dropna', 'dtypes', 'exceptAll', 'explain', 'fillna', 'filter', 'first', 'foreach', 'foreachPartition', 'freqItems', 'groupBy', 'groupby', 'head', 'hint', 'i

In [7]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [8]:
df.head(1)[0]

Row(age=None, name='Michael')

## Show Schema of DataFrame

In [9]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



## Show the column-names

In [10]:
df.columns

['age', 'name']

## Get Summary and Description of DataFrame

In [11]:
df.summary

<bound method DataFrame.summary of DataFrame[age: bigint, name: string]>

In [12]:
df.describe

<bound method DataFrame.describe of DataFrame[age: bigint, name: string]>

# Define the Input Schema and Load Corpus

In [13]:
from pyspark.sql.types import (StructField, StructType, StringType, IntegerType)

In [14]:
data_schema = [
    StructField(name="age", dataType=IntegerType(), nullable=True, metadata=None),
    StructField(name="name", dataType=StringType(), nullable=True, metadata=None)
]

struct_type = StructType(fields=data_schema)

In [15]:
df = spark.read.json(file_input_path, schema=struct_type)

In [16]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



# View data in given column

In [18]:
df.select(["age"])

DataFrame[age: int]

In [19]:
df.select(["age"]).show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [20]:
df.select(["age", "name"]).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# Types of Data Column, Data Row and Others

In [21]:
df["age"]

Column<b'age'>

In [22]:
type(df["age"])

pyspark.sql.column.Column

In [23]:
df.head(2)[0]

Row(age=None, name='Michael')

In [24]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [25]:
type(df.select(["age"]))

pyspark.sql.dataframe.DataFrame

# Change name of column

In [26]:
old_name = "age"
new_name = "Age (years old)"
df.withColumnRenamed(old_name, new_name).show()

+---------------+-------+
|Age (years old)|   name|
+---------------+-------+
|           null|Michael|
|             30|   Andy|
|             19| Justin|
+---------------+-------+



# Get New Column based on Calculating from Old Column

In [27]:
df.withColumn(new_name, df[old_name] * 2).show()

+----+-------+---------------+
| age|   name|Age (years old)|
+----+-------+---------------+
|null|Michael|           null|
|  30|   Andy|             60|
|  19| Justin|             38|
+----+-------+---------------+



# Using SQL query to Retrieve 

In [28]:
df.createOrReplaceTempView("people")

In [29]:
result = spark.sql("Select * From people")

In [30]:
result

DataFrame[age: int, name: string]

In [31]:
result.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# Load Corpus "appl_stock.csv"

In [32]:
file_input_path = dir_input_data_path + "appl_stock.csv"

## Sample Data

Date,Open,High,Low,Close,Volume,Adj Close
2010-01-04,213.429998,214.499996,212.38000099999996,214.009998,123432400,27.727039
2010-01-05,214.599998,215.589994,213.249994,214.379993,150476200,27.774976000000002

In [33]:
df = spark.read.csv(file_input_path, inferSchema=True, header=True)

## Review Input Corpus

In [34]:
df.head(2)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)]

In [35]:
df.show(2)

+-------------------+----------+----------+------------------+----------+---------+------------------+
|               Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
+-------------------+----------+----------+------------------+----------+---------+------------------+
only showing top 2 rows



## Show Schema

In [36]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Date: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]>

## Filter with Given Condition 

### Condition in String

In [37]:
df.filter("Close<500").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [38]:
df.filter("Close<500").select(["Open", "Close"]).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



### Condition with DataFrame Column

In [39]:
df.filter(df["Close"]<500).select(["Open", "Close"]).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



### Conditions with multi-operations

In [40]:
df.filter((df["Close"]<200) & (df["Open"]>200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



### Error Occurs when not using brackets

```
df.filter(df["Close"]<200 & df["Open"]>200).show()
```

Py4JError: An error occurred while calling o365.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)