In [3]:
import numpy as np
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.functions import array, array_contains, count, explode, lit, sum
from pyspark.sql.types import ArrayType, DoubleType, StructType, StructField, StringType, IntegerType, LongType

# Enable Arrow-based columnar data transfers
#spark.conf.set("io.netty.tryReflectionSetAccessible", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

schema = StructType([
    StructField("created", LongType(), True),
    StructField("d1", StringType(), True),
    StructField("d2", StringType(), True),
    StructField("dir", StringType(), True),
    StructField(
        "files",
        ArrayType(
            StructType([
                StructField("bitrate", StringType(), True),
                StructField("btih", StringType(), True),
                StructField("crc32", StringType(), True),
                StructField("format", StringType(), True),
                StructField("height", StringType(), True),
                StructField("length", StringType(), True),
                StructField("license", StringType(), True),
                StructField("md5", StringType(), True),
                StructField("mtime", StringType(), True),
                StructField("name", StringType(), True),
                StructField("original", StringType(), True),
                StructField("rotation", StringType(), True),
                StructField("sha1", StringType(), True),
                StructField("size", StringType(), True),
                StructField("source", StringType(), True),
                StructField("title", StringType(), True),
                StructField("track", StringType(), True),
                StructField("width", StringType(), True)
            ]), True), 
        True),
    StructField("files_count", LongType(), True),
    StructField("identifier", StringType(), True),
    StructField("item_last_updated", LongType(), True),
    StructField("item_size", LongType(), True),
    StructField(
        "metadata",
        StructType([
            StructField("Length", StringType(), True),
            StructField("addeddate", StringType(), True),
            StructField("adder", StringType(), True),
            StructField("aspect_ratio", StringType(), True),
            StructField("backup_location", StringType(), True),
            StructField("closed_captioning", StringType(), True),
            StructField("collection", ArrayType(StringType(), True), True),
            StructField("color", StringType(), True),
            StructField("contact", StringType(), True),
            StructField("coverage", StringType(), True),
            StructField("creator", StringType(), True),
            StructField("credits", StringType(), True),
            StructField("curation", StringType(), True),
            StructField("date", StringType(), True),
            StructField("description", StringType(), True),
            StructField("director", StringType(), True),
            StructField("duration", StringType(), True),
            StructField("format", StringType(), True),
            StructField("genre", StringType(), True),
            StructField("glob", StringType(), True),
            StructField("holder", StringType(), True),
            StructField("ia_orig__runtime", StringType(), True),
            StructField("identifier", StringType(), True),
            StructField("identifier-access", StringType(), True),
            StructField("identifier-ark", StringType(), True),
            StructField("imdb", StringType(), True),
            StructField("keywords", StringType(), True),
            StructField("language", StringType(), True),
            StructField("lcenseurl", StringType(), True),
            StructField("license", StringType(), True),
            StructField("licenseurl", StringType(), True),
            StructField("licensurl", StringType(), True),
            StructField("mediatype", StringType(), True),
            StructField("noarchivetorrent", StringType(), True),
            StructField("ocr", StringType(), True),
            StructField("omp-locally-produced", StringType(), True),
            StructField("omp-project", StringType(), True),
            StructField("own", StringType(), True),
            StructField("pbcore-genre", StringType(), True),
            StructField("pick", StringType(), True),
            StructField("ppi", StringType(), True),
            StructField("presenter", StringType(), True),
            StructField("producer", StringType(), True),
            StructField("publicdate", StringType(), True),
            StructField("publisher", StringType(), True),
            StructField("release_date", StringType(), True),
            StructField("repub_state", StringType(), True),
            StructField("resource", StringType(), True),
            StructField("runtime", StringType(), True),
            StructField("scanner", StringType(), True),
            StructField("segments", StringType(), True),
            StructField("series", StringType(), True),
            StructField("sound", StringType(), True),
            StructField("sponsor", StringType(), True),
            StructField("subject", StringType(), True),
            StructField("title", StringType(), True),
            StructField("tv-parental-guidelines", StringType(), True),
            StructField("updatedate", StringType(), True),
            StructField("updater", StringType(), True),
            StructField("upload_application", StringType(), True),
            StructField("uploader", StringType(), True),
            StructField("vimeo-height", StringType(), True),
            StructField("vimeo-id", StringType(), True),
            StructField("vimeo-n-entries", StringType(), True),
            StructField("vimeo-playlist", StringType(), True),
            StructField("vimeo-playlist-index", StringType(), True),
            StructField("vimeo-uploader", StringType(), True),
            StructField("vimeo-uploader-id", StringType(), True),
            StructField("vimeo-view-count", StringType(), True),
            StructField("vimeo-webpage-url", StringType(), True),
            StructField("vimeo-width", StringType(), True),
            StructField("year", StringType(), True),
            StructField("youtube-height", StringType(), True),
            StructField("youtube-id", StringType(), True),
            StructField("youtube-n-entries", StringType(), True),
            StructField("youtube-playlist", StringType(), True),
            StructField("youtube-playlist-index", StringType(), True),
            StructField("youtube-uploader", StringType(), True),
            StructField("youtube-uploader-id", StringType(), True),
            StructField("youtube-view-count", StringType(), True),
            StructField("youtube-webpage-url", StringType(), True),
            StructField("youtube-width", StringType(), True)
        ]), True),
])

# Local copy of gs://the-peoples-speech-west-europe/archive_org/Nov_6_2020/ALL_CAPTIONED_DATA.jsonl.gz
#df = spark.read.schema(schema).json("/home/ws15dgalvez/lingvo-copy/scripts/archive.org/ALL_CAPTIONED_DATA.jsonl.gz")
df = None
df = spark.read.schema(schema).json("gs://the-peoples-speech-west-europe/archive_org/Nov_6_2020/ALL_CAPTIONED_DATA.jsonl.gz")

In [4]:
# Calculate total number of hours. The null values are suspicious...
df.select(sum(col("metadata.duration").cast(DoubleType())) / 60 / 60).show()



+----------------------------------------------------+
|((sum(CAST(metadata.duration AS DOUBLE)) / 60) / 60)|
+----------------------------------------------------+
|                                  3881.1705555555554|
+----------------------------------------------------+



In [11]:
import langid

text_series = pd.Series(["bonjour", "Espania"], dtype=pd.StringDtype())
#df = spark.createDataFrame(pd.DataFrame(text_series, columns=["text"]))

def infer_language_func(text_column: pd.Series) -> pd.Series:
    return text_column.map(lambda string: langid.classify(string)[0] if string else "")

infer_language = pandas_udf(infer_language_func, returnType=StringType())

print("Inferred language:")
print(df.select(infer_language(col("metadata.description"))).groupBy(col('`infer_language_func(metadata.description)`')).count().orderBy('count', ascending=False).head(10))
print("Declared language:")
print(df.select(col("metadata.language")).groupBy(col('language')).count().orderBy('count', ascending=False).head(10))


Inferred language:
[Row(infer_language_func(metadata.description)='en', count=13162), Row(infer_language_func(metadata.description)='', count=3012), Row(infer_language_func(metadata.description)='de', count=80), Row(infer_language_func(metadata.description)='es', count=72), Row(infer_language_func(metadata.description)='fr', count=61), Row(infer_language_func(metadata.description)='fi', count=30), Row(infer_language_func(metadata.description)='nl', count=28), Row(infer_language_func(metadata.description)='la', count=20), Row(infer_language_func(metadata.description)='da', count=17), Row(infer_language_func(metadata.description)='it', count=16)]
Declared language:
[Row(language='English', count=7776), Row(language=None, count=6956), Row(language='eng', count=1791), Row(language='spanish', count=35), Row(language='english', count=15), Row(language='Mandarin Chinese', count=11), Row(language='Spanish', count=7), Row(language='English (dubbed)', count=6), Row(language='Hmong', count=3), Ro

In [5]:
#df.select(sum(explode(col("files.length")).cast(DoubleType())) /60. / 60.).show()

files_df = df.select(explode(col("files")).alias("file"))

In [10]:
files_df.printSchema()
# Why is "col" here?
#files_df.select("file.bitrate")
files_df.createOrReplaceTempView("files")
#spark.sql("SELECT SUM(CAST(file.length AS DOUBLE)) / 60. / 60. FROM files").show()
#spark.sql("SELECT file.length FROM files WHERE file.length is NOT NULL").show()

spark.sql("SELECT to_date('30:26', 'mm:ss')").show()

blah_df = spark.createDataFrame([('10:30',)], ['t'])
from pyspark.sql.functions import to_date, to_timestamp  # , to_unix_timestamp
print(blah_df.select(to_timestamp(blah_df.t, 'mm:ss').cast(LongType())).collect()) # - to_date('00:00', 'mm:ss').alias('date')).show()

# https://stackoverflow.com/a/54433013
# val timesTwoUDF = spark.udf.register("timesTwo", (x: Int) => x * 2)

spark.sql("""
SELECT CASE WHEN to_timestamp("file.length", 'mm:ss') IS NOT NULL THEN CAST(to_timestamp("file.length", 'mm:ss') AS Long)
    ELSE CAST ("file.length" AS LONG) / 60. / 60.
FROM files
WHERE file.length IS NOT NULL
AND file.source == 'original'
""").show()

#spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
#spark.conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/home/ws15dgalvez/the-peoples-speech-d0aa630b119d.json")



#to_date
#mm:ss



root
 |-- file: struct (nullable = true)
 |    |-- bitrate: string (nullable = true)
 |    |-- btih: string (nullable = true)
 |    |-- crc32: string (nullable = true)
 |    |-- format: string (nullable = true)
 |    |-- height: string (nullable = true)
 |    |-- length: string (nullable = true)
 |    |-- license: string (nullable = true)
 |    |-- md5: string (nullable = true)
 |    |-- mtime: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- original: string (nullable = true)
 |    |-- rotation: string (nullable = true)
 |    |-- sha1: string (nullable = true)
 |    |-- size: string (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- title: string (nullable = true)
 |    |-- track: string (nullable = true)
 |    |-- width: string (nullable = true)

+---------------------+
|to_date(30:26, mm:ss)|
+---------------------+
|           1970-01-01|
+---------------------+

[Row(CAST(to_timestamp(t, mm:ss) AS BIGINT)=630)]


ParseException: 
mismatched input 'to_timestamp' expecting {<EOF>, ';'}(line 2, pos 17)

== SQL ==

SELECT CASE WHEN to_timestamp("file.length", 'mm:ss') IS NOT NULL THEN CAST(to_timestamp("file.length", 'mm:ss') AS Long)
-----------------^^^
    ELSE CAST ("file.length" AS LONG) / 60. / 60.
FROM files
WHERE file.length IS NOT NULL
AND file.source == 'original'


In [58]:
?pyspark

Object `pyspark` not found.


In [1]:
import os
import shlex

import pandas as pd


#def infer_language_func(text_column: pd.Series) -> pd.Series:
#    return text_column.map(lambda string: langid.classify(string)[0] if string else "")
#infer_language = pandas_udf(infer_language_func, returnType=StringType())

def get_audio_data(raw_audio_binary_df: pd.DataFrame) -> pd.DataFrame:
    new_df = pd.DataFrame(cols=["signed_int16_waveform"])
    for row in raw_audio.itertuples():
        _, file_type = os.path.splitext(row.path)
        file_type = file_type.lstrip(".")
        # Always output in 16000 Hz
        
        with NamedTemporaryFile() as fh:
            cmd = f'sox -t {fmt} - -t wav --channels 1 --rate 16000 --encoding signed --bits 16 {fh.name}'
            p = subprocess.Popen(
                shlex.split(cmd),
              stdin=subprocess.PIPE,
              stdout=subprocess.PIPE,
              stderr=subprocess.PIPE)
            _, err = p.communicate(input=row.content)
            assert p.returncode == 0, err
            signed_int16_waveform = fh.read()
    return new_df

def voice_activity_detection():
    pass

# Change to */*.mp3 later
raw_audio_df = (spark.read.format("binaryFile")
                .option("pathGlobFilter", "*.mp3")
                .load("gs://the-peoples-speech-west-europe/archive_org/Nov_6_2020/ALL_CAPTIONED_DATA/bicycle_today_automobile_tomorrow"))
raw_audio_pd = raw_audio_df.toPandas()



In [7]:
raw_audio_pd

Unnamed: 0,path,modificationTime,length,content
0,gs://the-peoples-speech-west-europe/archive_or...,2020-11-07 11:30:50.704,4827136,"[73, 68, 51, 3, 0, 128, 0, 0, 6, 57, 67, 79, 7..."


In [None]:
raw_audio_pd = raw_audio_df.toPandas()
