In [1]:
# Imports
import os
import sys
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

In [2]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

# foreach() Example
def f(df):
    print(df.Seqno)
df.foreach(f)

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [3]:
df=spark.range(100)
print(df.sample(True,0.3,123).collect())
print(df.sample(0.1,456).collect())

[Row(id=0), Row(id=5), Row(id=9), Row(id=11), Row(id=14), Row(id=14), Row(id=16), Row(id=17), Row(id=21), Row(id=29), Row(id=33), Row(id=41), Row(id=42), Row(id=52), Row(id=52), Row(id=54), Row(id=58), Row(id=65), Row(id=65), Row(id=71), Row(id=76), Row(id=79), Row(id=85), Row(id=96)]
[Row(id=19), Row(id=21), Row(id=42), Row(id=48), Row(id=49), Row(id=50), Row(id=75), Row(id=80)]


In [4]:
df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())


[Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]


In [5]:
df=spark.read.option("header",True) \
        .csv("simple-zipcodes.csv")
df.printSchema()


root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- State: string (nullable = true)



In [6]:
df.write.option("header",True) \
        .partitionBy("state") \
        .mode("overwrite") \
        .csv("zipcodes-state")

In [9]:
df.write.option("header",True) \
        .partitionBy("country") \
        .mode("overwrite") \
        .csv("zipcodes-country")

In [12]:
df.repartition(2)\
        .write.option("header",True) \
        .partitionBy("state") \
        .mode("overwrite") \
        .csv("zipcodes-state-more")

In [16]:
df.write.option("header",True) \
        .option("maxRecordsPerFile", 2) \
        .partitionBy("state") \
        .mode("overwrite") \
        .csv("zipcodes-state")

In [24]:
df.write.option("header",True) \
        .partitionBy("state","city") \
        .mode("overwrite") \
        .csv("zipcodes-state")

In [26]:
dfSinglePart=spark.read.option("header",True) \
            .csv("zipcodes-state/state=AL/city=SPRINGVILLE")
dfSinglePart.printSchema()
dfSinglePart.show()

root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Zipcode: string (nullable = true)

+------------+-------+-------+
|RecordNumber|Country|Zipcode|
+------------+-------+-------+
|       54355|     US|  35146|
+------------+-------+-------+



In [39]:
# Read from partitioned data using sql
parqDF = spark.read.option("header",True) \
                  .csv("zipcodes-state")
parqDF.createOrReplaceTempView("ZIPCODE")
spark.sql("select * from ZIPCODE  where state='AL' and city='SPRINGVILLE'") \
    .show()

+------------+-------+-------+-----+-----------+
|RecordNumber|Country|Zipcode|state|       city|
+------------+-------+-------+-----+-----------+
|       54355|     US|  35146|   AL|SPRINGVILLE|
+------------+-------+-------+-----+-----------+



In [None]:
print(.sample(0.1,123).collect())
