# Spark DataFrame Basics

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("spark://spark:7077").appName("Basics").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/21 01:45:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read in data from json file
df = spark.read.json('/data/people.json')

                                                                                

In [3]:
# Show the dataframe
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [None]:
# Print the dataframe inferred schema
df.printSchema()

In [None]:
# Get column names
df.columns

In [None]:
# Get descriptive statistics
df.describe().show()

In [None]:
from pyspark.sql.types import (StructField, 
                               StringType, 
                               IntegerType, 
                               StructType)

In [None]:
# Build a schema with a list of structfields and specific pyspark types
data_schema = [StructField(name='age', 
                           dataType=IntegerType(),
                           nullable=True),
               StructField("name", StringType(), True)
              ]

In [None]:
# Use the schema to create a StructType
final_struct = StructType(fields=data_schema)

In [None]:
# Read in the same data from before but with a defined schema
df = spark.read.json(path = "data/people.json", schema=final_struct)

In [None]:
# Print the new schema
df.printSchema()

In [None]:
# Create a dataframe of a single column with .select method
df.select("age").show()

In [None]:
# Select the first two rows, and extract the first by index
df.head(2)[0]

In [None]:
# Create a dataframe by selecting two columns and show it
df.select(['age','name']).show()

In [None]:
# Create a dataframe with a new column by doubling age 
#(does not modify existing df inplace)
df.withColumn("double_age", df["age"]*2).show()

In [None]:
# Rename a Column
# (does not modify existing df inplace
df.withColumnRenamed("age","Age").show()

In [None]:
# Use Pure SQL with DataFrames

#Register a temporary view
df.createOrReplaceTempView('people')

In [None]:
results = spark.sql("SELECT * FROM people")

In [None]:
results.show()

In [None]:
new_results = spark.sql("SELECT * FROM people WHERE age>20")

In [None]:
new_results.show()