In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, UserDefinedFunction
from pyspark.sql.types import StringType

import my_pyspark_parser as parser

In [2]:
WIKI_PATH = 'datasets/'
WIKI_FILE = 'output_wiki_en_02.csv'

IN_PATH = 'datasets/raw data/'
IN_FILE = 'en_wiki_complete.xml'

TEST_FILE = 'wiki_sample.xml'

In [3]:
spark = SparkSession.builder.appName('VINF Projekt')\
    .config('spark.jars', 'file:///D://Programy//vinf//lib//spark-xml_2.12-0.14.0.jar')\
    .config('spark.executor.extraClassPath', 'file:///D://Programy//vinf//lib//spark-xml_2.12-0.14.0.jar')\
    .config('spark.executor.extraLibrary', 'file:///D://Programy//vinf//lib//spark-xml_2.12-0.14.0.jar')\
    .config('spark.driver.extraClassPath', 'file:///D://Programy//vinf//lib//spark-xml_2.12-0.14.0.jar')\
    .getOrCreate()

In [4]:
spark

In [24]:
root = 'mediawiki'
row = 'page'

schema = StructType([StructField('id', StringType(), True),
                    StructField('title', StringType(), True),
                    StructField('revision', StructType([StructField('text', StringType(), True)]))])

df = spark.read.format('com.databricks.spark.xml')\
    .options(rootTag=root)\
    .options(rowTag=row)\
    .schema(schema)\
    .load(IN_PATH + TEST_FILE)

In [25]:
df = df.withColumn("revision", col("revision").cast("String"))
df = df.withColumnRenamed("revision", "text")

In [26]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)



In [27]:
df.show(10)

+--------+--------------------+--------------------+
|      id|               title|                text|
+--------+--------------------+--------------------+
|   15823|       Joseph Conrad|[{{Short descript...|
| 5593936|          Template:'|              [peen]|
|55882557|Template:20th Cen...|[{{#if: {{{FID<in...|
| 2557713|       Template:Abbr|               [lol]|
| 1501273|      Template:Audio|            [yeezus]|
|23327809|Template:Authorit...|[{{#invoke:Author...|
| 6594285| Template:Birth date|[<span style="dis...|
|  994397| Template:Blockquote|[<templatestyles ...|
|58109865|Template:Blockquo...|[/* {{pp-template...|
|27461703|Template:Br separ...|[{{<includeonly>s...|
+--------+--------------------+--------------------+
only showing top 10 rows



In [28]:
my_udf = UserDefinedFunction(parser.save_page, StringType())

df_new = df.withColumn('text', my_udf('text'))
df_new = df_new.na.drop()

In [None]:
df_new.coalesce(1).write.format('com.databricks.spark.csv').save(WIKI_PATH + "spark_output", header='true')