In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [2]:
conf = SparkConf()
ss = SparkSession(sc)
ss.udf.registerJavaFunction("parseUrl", "com.github.silentsokolov.App", t.ArrayType(t.StringType(), True))

In [3]:
df = ss.read.csv(['/path/to/data'], sep='\t')
df = df.select(
    f.col('_c4')
).filter(f.col('_c4').isNotNull()).limit(1000000).cache()
df.show()

In [4]:
from urllib.parse import urlparse

@f.udf(t.ArrayType(t.StringType(), True))
def parse_py(v):
    try:
        o = urlparse(v)
        return [o.scheme, o.netloc.replace('www.', '', 1), o.path + o.query + o.fragment]
    except:
        return None

In [5]:
%timeit df.withColumn('v', parse_py(f.col('_c4'))).agg(f.count(f.col('v')[0])).show()

15.9 s ± 618 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
%%timeit

df.select(
    f.expr('parse_url(_c4, "PROTOCOL")').alias('scheme'),
    f.regexp_replace(
        f.expr('parse_url(_c4, "HOST")'),
        r'^(www\.)',
        ''
    ).alias('domain'),
    f.when(
        f.expr('parse_url(_c4, "REF")').isNull(),
        f.expr('parse_url(_c4, "FILE")')
    ).otherwise(
        f.concat(
            f.expr('parse_url(_c4, "FILE")'),
            f.lit('#'),
            f.expr('parse_url(_c4, "REF")')
        )
    ).alias('path'),
).withColumn('v', f.array(f.col('scheme'), f.col('domain'), f.col('path'))).agg(f.count(f.col('v')[0])).show()

9.16 s ± 408 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
%%timeit

df.withColumn('v', f.expr('parseUrl(_c4)')).agg(f.count(f.col('v')[0])).show()

8.11 s ± 199 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
