In [1]:
// because we'll need it later
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

// Utility method to count & print the number of records in each partition.
def printRecordsPerPartition(df:org.apache.spark.sql.Dataset[Row]):Unit = {
  println("Per-Partition Counts:")
  val results = df.rdd                                   // Convert to an RDD
    .mapPartitions(it => Array(it.size).iterator, true)  // For each partition, count
    .collect()                                           // Return the counts to the driver

  results.foreach(x => println("* " + x))
}

printRecordsPerPartition: (df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])Unit


In [2]:
println(spark)

org.apache.spark.sql.SparkSession@51c54b69


In [5]:
print(sc)


org.apache.spark.SparkContext@691ea435

In [6]:
// A reference to our tab-seperated-file
val csvFile = "file:///Users/navaro/zeppelin-0.7.3-bin-all/data/pageviews-by-second-tsv.bz2"

var tempDF = spark.read       // The DataFrameReader
  .option("delimiter", "\t")  // Use tab delimiter (default is comma-separator)
  .csv(csvFile)               // Creates a DataFrame from CSV after reading in the file

csvFile = file:///Users/navaro/zeppelin-0.7.3-bin-all/data/pageviews-by-second-tsv.bz2
tempDF = [_c0: string, _c1: string ... 1 more field]


[_c0: string, _c1: string ... 1 more field]

In [7]:
tempDF.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [8]:
spark.read                    // The DataFrameReader
  .option("delimiter", "\t")  // Use tab delimiter (default is comma-separator)
  .option("header", "true")   // Use first line of all files as header
  .csv(csvFile)               // Creates a DataFrame from CSV after reading in the file
  .printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- site: string (nullable = true)
 |-- requests: string (nullable = true)



In [9]:
spark.read                        // The DataFrameReader
  .option("header", "true")       // Use first line of all files as header
  .option("delimiter", "\t")      // Use tab delimiter (default is comma-separator)
  .option("inferSchema", "true")  // Automatically infer data types
  .csv(csvFile)                   // Creates a DataFrame from CSV after reading in the file
  .printSchema()

root                                                                            
 |-- timestamp: timestamp (nullable = true)
 |-- site: string (nullable = true)
 |-- requests: integer (nullable = true)



In [10]:
// Required for StructField, StringType, IntegerType, etc.
import org.apache.spark.sql.types._

val csvSchema = StructType(
  List(
    StructField("timestamp", StringType, false),
    StructField("site", StringType, false),
    StructField("requests", IntegerType, false)
  )
)

csvSchema = StructType(StructField(timestamp,StringType,false), StructField(site,StringType,false), StructField(requests,IntegerType,false))


StructType(StructField(timestamp,StringType,false), StructField(site,StringType,false), StructField(requests,IntegerType,false))

In [11]:
spark.read                    // The DataFrameReader
  .option("header", "true")   // Ignore line #1 - it's a header
  .option("delimiter", "\t")  // Use tab delimiter (default is comma-separator)
  .schema(csvSchema)          // Use the specified schema
  .csv(csvFile)               // Creates a DataFrame from CSV after reading in the file
  .printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- site: string (nullable = true)
 |-- requests: integer (nullable = true)



In [12]:
val csvDF = spark.read
  .option("header", "true")
  .option("delimiter", "\t")
  .schema(csvSchema)
  .csv(csvFile)

printf("Partitions: %,d%n", csvDF.rdd.partitions.size)
printRecordsPerPartition(csvDF)
println("-"*80)

Partitions: 8
Per-Partition Counts:
* 1077215                                                                       
* 999868
* 999737
* 999235
* 999441
* 985819
* 1002541
* 136144
--------------------------------------------------------------------------------


csvDF = [timestamp: string, site: string ... 1 more field]


[timestamp: string, site: string ... 1 more field]

In [13]:
val jsonFile = "file:///Users/navaro/zeppelin-0.7.3-bin-all/data/snapshot.json"

val wikiEditsDF = spark.read        // The DataFrameReader
    .option("inferSchema", "true")  // Automatically infer data types & column names
    .json(jsonFile)                 // Creates a DataFrame from JSON after reading in the file

wikiEditsDF.printSchema()

root
 |-- channel: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- delta: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- geocoding: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- countryCode2: string (nullable = true)
 |    |-- countryCode3: string (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- stateProvince: string (nullable = true)
 |-- isAnonymous: boolean (nullable = true)
 |-- isNewPage: boolean (nullable = true)
 |-- isRobot: boolean (nullable = true)
 |-- isUnpatrolled: boolean (nullable = true)
 |-- namespace: string (nullable = true)
 |-- page: string (nullable = true)
 |-- pageURL: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- url: string (nullable = true)
 |-- user: string (nullable = true)
 |-- userURL: string (nullable = true)
 |-- wikipedia: string (nullable = true)
 

jsonFile = file:///Users/navaro/zeppelin-0.7.3-bin-all/data/snapshot.json
wikiEditsDF = [channel: string, comment: string ... 16 more fields]


[channel: string, comment: string ... 16 more fields]

In [14]:
val jsonSchema = StructType(List(
  StructField("channel", StringType, true),
  StructField("comment", StringType, true),
  StructField("delta", IntegerType, true),
  StructField("flag", StringType, true),
  StructField("geocoding", StructType(List(
    StructField("city", StringType, true),
    StructField("country", StringType, true),
    StructField("countryCode2", StringType, true),
    StructField("countryCode3", StringType, true),
    StructField("stateProvince", StringType, true),
    StructField("latitude", DoubleType, true),
    StructField("longitude", DoubleType, true)
  )), true),
  StructField("isAnonymous", BooleanType, true),
  StructField("isNewPage", BooleanType, true),
  StructField("isRobot", BooleanType, true),
  StructField("isUnpatrolled", BooleanType, true),
  StructField("namespace", StringType, true),
  StructField("page", StringType, true),
  StructField("pageURL", StringType, true),
  StructField("timestamp", StringType, true),
  StructField("url", StringType, true),
  StructField("user", StringType, true),
  StructField("userURL", StringType, true),
  StructField("wikipediaURL", StringType, true),
  StructField("wikipedia", StringType, true)
))

jsonSchema = StructType(StructField(channel,StringType,true), StructField(comment,StringType,true), StructField(delta,IntegerType,true), StructField(flag,StringType,true), StructField(geocoding,StructType(StructField(city,StringType,true), StructField(country,StringType,true), StructField(countryCode2,StringType,true), StructField(countryCode3,StringType,true), StructField(stateProvince,StringType,true), StructField(latitude,DoubleType,true), StructField(longitude,DoubleType,true)),true), StructField(isAnonymous,BooleanType,true), StructField(isNewPage,BooleanType,true), StructField(isRobot,BooleanType,true), StructField(isUnpatrolled,BooleanType,true), StructField(namespace,StringType,true), StructField(page,StringType,true), StructField(pageURL,S...


StructType(StructField(channel,StringType,true), StructField(comment,StringType,true), StructField(delta,IntegerType,true), StructField(flag,StringType,true), StructField(geocoding,StructType(StructField(city,StringType,true), StructField(country,StringType,true), StructField(countryCode2,StringType,true), StructField(countryCode3,StringType,true), StructField(stateProvince,StringType,true), StructField(latitude,DoubleType,true), StructField(longitude,DoubleType,true)),true), StructField(isAnonymous,BooleanType,true), StructField(isNewPage,BooleanType,true), StructField(isRobot,BooleanType,true), StructField(isUnpatrolled,BooleanType,true), StructField(namespace,StringType,true), StructField(page,StringType,true), StructField(pageURL,StringType,true), StructField(timestamp,StringType,true), StructField(url,StringType,true), StructField(user,StringType,true), StructField(userURL,StringType,true), StructField(wikipediaURL,StringType,true), StructField(wikipedia,StringType,true))

In [15]:
spark.read             // The DataFrameReader
  .schema(jsonSchema)  // Use the specified schema
  .json(jsonFile)      // Creates a DataFrame from JSON after reading in the file
  .printSchema()

root
 |-- channel: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- delta: integer (nullable = true)
 |-- flag: string (nullable = true)
 |-- geocoding: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- countryCode2: string (nullable = true)
 |    |-- countryCode3: string (nullable = true)
 |    |-- stateProvince: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- isAnonymous: boolean (nullable = true)
 |-- isNewPage: boolean (nullable = true)
 |-- isRobot: boolean (nullable = true)
 |-- isUnpatrolled: boolean (nullable = true)
 |-- namespace: string (nullable = true)
 |-- page: string (nullable = true)
 |-- pageURL: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- url: string (nullable = true)
 |-- user: string (nullable = true)
 |-- userURL: string (nullable = true)
 |-- wikipediaURL: string (nullable = t

In [16]:
val jsonDF = spark.read
  .schema(jsonSchema)
  .json(jsonFile)

printf("Partitions: %,d%n", jsonDF.rdd.partitions.size)
printRecordsPerPartition(jsonDF)
println("-"*80)

Partitions: 1
Per-Partition Counts:
* 1
--------------------------------------------------------------------------------


jsonDF = [channel: string, comment: string ... 16 more fields]


[channel: string, comment: string ... 16 more fields]

In [18]:
val textFile = "file:///Users/navaro/zeppelin-0.7.3-bin-all/data/tom.txt"

spark.read         // The DataFrameReader
  .text(textFile)  // Creates a DataFrame from raw text after reading in the file
  .printSchema()

root
 |-- value: string (nullable = true)



textFile = file:///Users/navaro/zeppelin-0.7.3-bin-all/data/tom.txt


lastException: Throwable = null


file:///Users/navaro/zeppelin-0.7.3-bin-all/data/tom.txt

In [19]:
val parquetFile = "file:///Users/navaro/zeppelin-0.7.3-bin-all/data/pagecounts-parquet/"

spark.read               // The DataFrameReader
  .parquet(parquetFile)  // Creates a DataFrame from Parquet after reading in the file
  .printSchema()

root
 |-- project: string (nullable = true)
 |-- article: string (nullable = true)
 |-- requests: integer (nullable = true)
 |-- bytes_served: long (nullable = true)



parquetFile = file:///Users/navaro/zeppelin-0.7.3-bin-all/data/pagecounts-parquet/


file:///Users/navaro/zeppelin-0.7.3-bin-all/data/pagecounts-parquet/

In [20]:
val parquetSchema = StructType(
  List(
    StructField("project", StringType, false),
    StructField("article", StringType, false),
    StructField("requests", IntegerType, false),
    StructField("bytes_served", LongType, false)
  )
)

spark.read                // The DataFrameReader
  .schema(parquetSchema)  // Use the specified schema
  .parquet(parquetFile)   // Creates a DataFrame from Parquet after reading in the file
  .printSchema()

root
 |-- project: string (nullable = true)
 |-- article: string (nullable = true)
 |-- requests: integer (nullable = true)
 |-- bytes_served: long (nullable = true)



parquetSchema = StructType(StructField(project,StringType,false), StructField(article,StringType,false), StructField(requests,IntegerType,false), StructField(bytes_served,LongType,false))


StructType(StructField(project,StringType,false), StructField(article,StringType,false), StructField(requests,IntegerType,false), StructField(bytes_served,LongType,false))

In [21]:
val parquetDF = spark.read.schema(parquetSchema).parquet(parquetFile)

printf("Partitions: %,d%n", parquetDF.rdd.partitions.size)
printRecordsPerPartition(parquetDF)
println("-"*80)

Partitions: 8
Per-Partition Counts:
* 1161100                                                                       
* 1111411
* 999869
* 724384
* 725313
* 625841
* 536227
* 386797
--------------------------------------------------------------------------------


parquetDF = [project: string, article: string ... 2 more fields]


[project: string, article: string ... 2 more fields]