## Processing XML with Spark

In [42]:
datasets = 'data/ml-latest-xml'
datasets_csv = 'data/ml-latest'

## Process XML with DataFrame

In [36]:
%%time
df_ratings_xml = spark.read.format("com.databricks.spark.xml") \
          .options(rowTag='rating').load(datasets + '/ratings.xml')

Wall time: 126 ms


In [37]:
# Let's inspect our ratings dataframe
df_ratings_xml.show(5)

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|    169|   2.5|1204927694|     1|
|   2471|   3.0|1204927438|     1|
|  48516|   5.0|1204927435|     1|
|   2571|   3.5|1436165433|     2|
| 109487|   4.0|1436165496|     2|
+-------+------+----------+------+
only showing top 5 rows



In [21]:
# Show only ratings bigger than 3
df_ratings_xml.where('rating > 3').show(5)

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|  48516|   5.0|1204927435|     1|
|   2571|   3.5|1436165433|     2|
| 109487|   4.0|1436165496|     2|
| 112552|   5.0|1436165496|     2|
| 112556|   4.0|1436165499|     2|
+-------+------+----------+------+
only showing top 5 rows



In [27]:
# Save the ratings equal to 5
df_ratings_xml.where('rating = 5')    \
    .write \
    .format('com.databricks.spark.xml') \
    .options(rowTag='rating', rootTag='ratings') \
    .save(datasets + '/ratings-5.xml')

### Process XML with SparkSQL

In [23]:
spark.sql('''DROP TABLE IF EXISTS ratings_xml''')
spark.sql('''CREATE TABLE ratings_xml
USING com.databricks.spark.xml
OPTIONS (path "''' + datasets + '''/ratings.xml", rowTag "rating")
''')

DataFrame[]

In [24]:
# Let's see what we have in our table
spark.sql('SELECT * FROM ratings_xml LIMIT 5').show()

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|    169|   2.5|1204927694|     1|
|   2471|   3.0|1204927438|     1|
|  48516|   5.0|1204927435|     1|
|   2571|   3.5|1436165433|     2|
| 109487|   4.0|1436165496|     2|
+-------+------+----------+------+



In [28]:
# Let's define table for our 5 star ratings
# Target path must exist.

spark.sql('''DROP TABLE IF EXISTS ratings_5_xml''')
spark.sql('''CREATE TABLE ratings_5_xml
USING com.databricks.spark.xml
OPTIONS (path "''' + datasets + '''/ratings-5.xml", rowTag "rating")
''')

DataFrame[]

In [29]:
# And now overwrite it
spark.sql('''INSERT OVERWRITE TABLE ratings_5_xml SELECT * FROM ratings_xml WHERE rating = 5''')

DataFrame[]

In [30]:
# And inspect our table
spark.sql('SELECT * FROM ratings_5_xml LIMIT 5').show()

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|  48516|   5.0|1204927435|     1|
| 112552|   5.0|1436165496|     2|
|   2431|   5.0| 920586945|     3|
|     94|   5.0|1037740486|     4|
|    538|   5.0|1037740474|     4|
+-------+------+----------+------+



### Preparation 

This section is creating ratings.xml file from ratings.csv.
To understand it better, please skip it for now.

In [73]:
# Remove target directory if exists.
! rm -rf $datasets/ratings

In [44]:
%%time
# Read the CSV file
df_ratings = spark.read.csv(datasets_csv + '/ratings.csv', inferSchema=True, header=True)

# Register the ratings CSV data as Spark SQL view
df_ratings.createOrReplaceTempView('ratings')
df_ratings.show(3)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    169|   2.5|1204927694|
|     1|   2471|   3.0|1204927438|
|     1|  48516|   5.0|1204927435|
+------+-------+------+----------+
only showing top 3 rows

Wall time: 12.7 s


In [61]:
%time df_ratings.write \
    .format('com.databricks.spark.xml') \
    .options(rowTag='rating', rootTag='ratings') \
    .save(datasets + '/ratings')

Wall time: 1min 50s


In [49]:
# How big is our XML dataset?
! du -h $datasets/ratings

    3.46GB data/ml-latest-xml/ratings
