In [30]:
from pyspark.sql import SparkSession


In [31]:
spark = SparkSession.builder.appName('spork').getOrCreate()

In [32]:
df = spark.read.json('../data/people.json')
df.printSchema()

# has two potential parameter
df.show(n=3, truncate=False)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
|age |name   |
+----+-------+
|null|Michael|
|30  |Andy   |
|19  |Justin |
+----+-------+



In [33]:
# filter based on the col operation startswith
df.filter(df['name'].startswith('M')).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [19]:
# limit returns a limited subset of the rows
df.limit(1).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [20]:
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [21]:
df.drop('name').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [22]:
# distinct() alias for dropDuplicates()
# cache() alias for persist()

In [25]:
# caches data for quick recall
df.cache().count()

3

In [26]:
df.count()

3

In [15]:
df2 = ( 
    spark.read
         .option('header', 'true')
         .option('inferSchema', 'true')
         .csv('../data/walmart_stock.csv') 
)

In [24]:
(
    df2.limit(5)
       .select('Date', 'Open')
       .write
       .parquet('../data/goose.parquet')
#        .show()
)

In [25]:
ls

basics.ipynb


In [28]:
ls ../data/goose.parquet/

part-00000-4d84c95e-b776-4e75-a901-8b9e117a18b4-c000.snappy.parquet  _SUCCESS


In [29]:
df3 = (spark.read
            .parquet('../data/goose.parquet')
      )

df3.show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2012-01-03 00:00:00|         59.970001|
|2012-01-04 00:00:00|60.209998999999996|
|2012-01-05 00:00:00|         59.349998|
|2012-01-06 00:00:00|         59.419998|
|2012-01-09 00:00:00|         59.029999|
+-------------------+------------------+



In [None]:
# TODO

# Replace FILL_IN with your code. You will probably need multiple
# lines of code for this problem.

parquetDir = "/mnt/training/wikipedia/pagecounts/staging_parquet_en_only_clean/"

washingtons = spark.read.parquet(parquetDir)
washingtons = (washingtons.filter(washingtons['project'] == 'en')
                          .filter(washingtons['article'].endswith('_Washington'))
                          .collect())
totalWashingtons = 0

for washington in washingtons:
  totalWashingtons += washington['requests']
  
print("Total Washingtons: {0:,}".format( len(washingtons) ))
print("Total Washington Requests: {0:,}".format( totalWashingtons ))

In [33]:
from pyspark.sql.functions import col
parquetDir = "/mnt/training/wikipedia/pagecounts/staging_parquet_en_only_clean/"

washingtons = (spark.read
                    .parquet(parquetDir)
                    .filter("project == 'en'")
                    .filter(col('article').endswith('_Washington'))
                    .collect())

totalWashingtons = 0

for washington in washingtons:
  totalWashingtons += washington['requests']
  
print("Total Washingtons: {0:,}".format( len(washingtons) ))
print("Total Washington Requests: {0:,}".format( totalWashingtons ))

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [None]:
tempA = (initialDF
  .withColumnRenamed("timestamp", "capturedAt")
  .select( col("*"), unix_timestamp( col("capturedAt"), "yyyy-MM-dd HH:mm:ss") )
)
tempA.printSchema()

## Columns

In [36]:
# If we import ...sql.functions, we get a couple of more options:
from pyspark.sql.functions import *

# This uses the col(..) function
columnC = col("requests")

# This uses the expr(..) function which parses an SQL Expression
columnD = expr("a + 1")

# This uses the lit(..) to create a literal (constant) value.
columnE = lit("abc")

# Print the type of each attribute
print("columnC: {}".format(columnC))
print("columnD: {}".format(columnD))
print("columnE: {}".format(columnE))

columnC: Column<b'requests'>
columnD: Column<b'(a + 1)'>
columnE: Column<b'abc'>


In [57]:
from pyspark.sql.functions import col, expr, lit

dataframe = (spark.read
                  .option('header', 'true')
                  .option('inferSchema', 'true')
                  .csv('../data/sales_info.csv')
            )

dataframe = dataframe.select(col('Person'), col('Sales'))
dataframe = dataframe.withColumn('New Sales', expr('Sales + 1'))
dataframe = dataframe.withColumn('Constant', lit('2'))

dataframe.show(10, False)

+-------+-----+---------+--------+
|Person |Sales|New Sales|Constant|
+-------+-----+---------+--------+
|Sam    |200.0|201.0    |2       |
|Charlie|120.0|121.0    |2       |
|Frank  |340.0|341.0    |2       |
|Tina   |600.0|601.0    |2       |
|Amy    |124.0|125.0    |2       |
|Vanessa|243.0|244.0    |2       |
|Carl   |870.0|871.0    |2       |
|Sarah  |350.0|351.0    |2       |
|John   |250.0|251.0    |2       |
|Linda  |130.0|131.0    |2       |
+-------+-----+---------+--------+
only showing top 10 rows



In [64]:
print('COLLECT:')
for row in dataframe.collect():
    print(row)
    
print('\nTAKE 3:')
for row in dataframe.take(3):
    print(row)

COLLECT:
Row(Person='Sam', Sales=200.0, New Sales=201.0, Constant='2')
Row(Person='Charlie', Sales=120.0, New Sales=121.0, Constant='2')
Row(Person='Frank', Sales=340.0, New Sales=341.0, Constant='2')
Row(Person='Tina', Sales=600.0, New Sales=601.0, Constant='2')
Row(Person='Amy', Sales=124.0, New Sales=125.0, Constant='2')
Row(Person='Vanessa', Sales=243.0, New Sales=244.0, Constant='2')
Row(Person='Carl', Sales=870.0, New Sales=871.0, Constant='2')
Row(Person='Sarah', Sales=350.0, New Sales=351.0, Constant='2')
Row(Person='John', Sales=250.0, New Sales=251.0, Constant='2')
Row(Person='Linda', Sales=130.0, New Sales=131.0, Constant='2')
Row(Person='Mike', Sales=750.0, New Sales=751.0, Constant='2')
Row(Person=' Chris', Sales=350.0, New Sales=351.0, Constant='2')

TAKE 3:
Row(Person='Sam', Sales=200.0, New Sales=201.0, Constant='2')
Row(Person='Charlie', Sales=120.0, New Sales=121.0, Constant='2')
Row(Person='Frank', Sales=340.0, New Sales=341.0, Constant='2')


In [70]:
dataframe.show()
dataframe.select(explode(split(col('Person'), 'e'))).show()

+-------+-----+---------+--------+
| Person|Sales|New Sales|Constant|
+-------+-----+---------+--------+
|    Sam|200.0|    201.0|       2|
|Charlie|120.0|    121.0|       2|
|  Frank|340.0|    341.0|       2|
|   Tina|600.0|    601.0|       2|
|    Amy|124.0|    125.0|       2|
|Vanessa|243.0|    244.0|       2|
|   Carl|870.0|    871.0|       2|
|  Sarah|350.0|    351.0|       2|
|   John|250.0|    251.0|       2|
|  Linda|130.0|    131.0|       2|
|   Mike|750.0|    751.0|       2|
|  Chris|350.0|    351.0|       2|
+-------+-----+---------+--------+

+------+
|   col|
+------+
|   Sam|
|Charli|
|      |
| Frank|
|  Tina|
|   Amy|
|   Van|
|   ssa|
|  Carl|
| Sarah|
|  John|
| Linda|
|   Mik|
|      |
| Chris|
+------+



In [77]:
dataframe.select(col('Person').alias('g')).show()

+-------+
|      g|
+-------+
|    Sam|
|Charlie|
|  Frank|
|   Tina|
|    Amy|
|Vanessa|
|   Carl|
|  Sarah|
|   John|
|  Linda|
|   Mike|
|  Chris|
+-------+



In [78]:
from pyspark.sql.functions import *

sourceFile = "dbfs:/mnt/training/dataframes/people-with-dups.txt"
destFile = userhome + "/people.parquet"


# In case it already exists
dbutils.fs.rm(destFile, True)

spark.conf.set("spark.sql.shuffle.partitions", 8)

(spark.read
      .option('sep', ':')
      .option('header', 'true')
      .csv(sourceFile)
      .select(initcap(lower(col('firstName'))).alias('firstName'), initcap(lower(col('middleName'))).alias('middleName'), initcap(lower(col('lastName'))).alias('lastName'), 'gender', 'birthDate', 'salary', regexp_replace(col('ssn'), r'(-)', '').alias('ssn'))
      .orderBy('firstName', 'lastName')
      .dropDuplicates()
      .write
      .parquet(destFile)
)

NameError: name 'userhome' is not defined

In [79]:
from pyspark.sql.functions import *

sourceFile = "dbfs:/mnt/training/dataframes/people-with-dups.txt"
destFile = userhome + "/people.parquet"


# In case it already exists
dbutils.fs.rm(destFile, True)

spark.conf.set("spark.sql.shuffle.partitions", 8)

(spark.read
      .option('sep', ':')
      .option('header', 'true')
      .csv(sourceFile)
      .withColumn('lcFirst', lower(col('firstName')))
      .withColumn('lcMiddle', lower(col('middleName')))
      .withColumn('lcLast', lower(col('lastName')))
      .withColumn('fixSsn', regexp_replace(col('ssn'), r'(-)', ''))
      .dropDuplicates(['lcFirst', 'lcMiddle', 'lcLast', 'gender', 'salary', 'birthDate', 'fixSsn'])
      .drop('lcFirst', 'lcMiddle', 'lcLast', 'fixSsn')
      .write
      .parquet(destFile)
)


NameError: name 'userhome' is not defined