In [1]:
# The following Spark basics are performed on people.json
!pip install pyspark


Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 60kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 17.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=c085b960f607383d5c7cc3fd1804aac5b30796a1d042c1411032db9da63d7969
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [7]:
df = spark.read.json('/people.json')

In [8]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [10]:
df.columns

['age', 'name']

In [11]:
df.describe

<bound method DataFrame.describe of DataFrame[age: bigint, name: string]>

In [12]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [13]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [14]:
data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), True)]

In [15]:
final_struc = StructType(fields = data_schema)

In [17]:
df = spark.read.json('/people.json',schema = final_struc)

In [18]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [19]:
df['name']

Column<b'name'>

In [21]:
type(df['age'])

pyspark.sql.column.Column

In [22]:
df.select('age')

DataFrame[age: int]

In [23]:
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [24]:
df.select('name', 'age').show()

+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+



In [25]:
df.head(3)[2]

Row(age=19, name='Justin')

In [29]:
# adding a new column
df.withColumn('age_square', df['age']**2).show()

+----+-------+----------+
| age|   name|age_square|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|     900.0|
|  19| Justin|     361.0|
+----+-------+----------+



In [31]:
#renaming a column
df.withColumnRenamed('name', 'first_name').show()

+----+----------+
| age|first_name|
+----+----------+
|null|   Michael|
|  30|      Andy|
|  19|    Justin|
+----+----------+



In [33]:
# SQL statements on a dataframe in python
df.createOrReplaceTempView('people')
results = spark.sql('select * from people where (name = "Michael" or name = "Andy")')
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
+----+-------+

