This notebook will use GOUDA datasets and services created for working with large datasets:

https://github.com/bio-guoda/guoda-datasets


A well-formed Spark dataframe of iDigBio data will be loaded and we will extract the data that is of interest to our group.

You can read more about working with HDFS files here: https://github.com/bio-guoda/guoda-services/wiki/Jupyter-HDFS

In a new terminal, a command like: 
```hadoop fs -ls /guoda/data | grep idigbio | less ``` 

will show all of the available iDigBio datasets.

/guoda/data/idigbio-20181201T023353-100k.parquet 
/guoda/data/idigbio-20181201T023353.parquet

In [None]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20181201T023353-100k.parquet")
n = idb_df.count()

Our iDigBio dataframe has ~115m observations. The schema looks like this: 

In [3]:
idb_df.printSchema()

root
 |-- barcodevalue: string (nullable = true)
 |-- basisofrecord: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- canonicalname: string (nullable = true)
 |-- catalognumber: string (nullable = true)
 |-- class: string (nullable = true)
 |-- collectioncode: string (nullable = true)
 |-- collectionid: string (nullable = true)
 |-- collectionname: string (nullable = true)
 |-- collector: string (nullable = true)
 |-- commonname: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- coordinateuncertainty: float (nullable = true)
 |-- country: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- county: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- coreid: string (nullable = true)
 |    |-- dc:rights: string (nullable = true)
 |    |-- dcterms:accessRights: string (nullable = true)
 |    |-- dcterms:bibliographicCitation: string (nullable = true)
 |    |-- dcterms:language: string (nullable = true)
 |    |-- d

List of the columns that we want to summarize. These are the columns of current interest to the DwC group at the moment. Many columns have unique identifiers, free text, or already normalized vocab so it doesn't make sense to summarize everything for the purpose of trying to do everything at this point and a future more formal data product might.


In [4]:
columns = [
        "recordset",
        "data.dwc:basisOfRecord",
        "data.dwc:day",
        "data.dwc:disposition",
        "data.dwc:establishmentMeans",
        "data.dwc:geodeticDatum",
        "data.dwc:georeferenceVerificationStatus",
        "data.dwc:identificationQualifier",
        "data.dwc:identificationVerificationStatus",

        "data.dwc:lifeStage",
        "data.dwc:month",
        "data.dwc:nomenclaturalCode",
        "data.dwc:nomenclaturalStatus",
        "data.dwc:occurrenceStatus",

        "data.dwc:preparations",
        "data.dwc:reproductiveCondition",
        "data.dwc:sex",
        "data.dwc:typeStatus",
        "data.dwc:taxonRank",
        "data.dwc:taxonomicStatus",
        
        "data.dwc:behavior",
        "data.dwc:continent",
        "data.dwc:country",
        "data.dwc:countryCode",
        "data.dwc:island",
        "data.dwc:islandGroup",
        "data.dcterms:language",
        "data.dcterms:license",
        "data.dwc:verbatimSRS",
        "data.dwc:waterBody",
    
        "basisofrecord",
        "typestatus",
        "taxonrank",
        "taxonomicstatus"
        
        ]

We need to transform this dataset from a "wide" form to a "tall" form and we use the following code to achieve this. You can read more about this type of tidy data transformation, known as "melt", here: https://vita.had.co.nz/papers/tidy-data.pdf

In [None]:
# Some fancy code from the internet. Note this transform could also be done 
# by manually typing out a bunch of unions, one for each column. For columns
# with highly repetitious values this is a reasonable all-at-once approach.
# For a more general solution, looping over the columns will result in much
# smaller data sizes and is more appropriate for a long term data product.

from pyspark.sql.functions import array, col, explode, struct, lit


def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])


We can now create a dataframe of key value pairs of our columns and their recordset identifier

In [6]:
k_v_pairs = to_long(idb_df.select(columns), ["recordset"])
k_v_pairs.head(10)

[Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:basisOfRecord', val='PreservedSpecimen'),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:day', val='1'),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:disposition', val=None),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:establishmentMeans', val=None),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:geodeticDatum', val=None),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:georeferenceVerificationStatus', val=None),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:identificationQualifier', val=None),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:identificationVerificationStatus', val=None),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:lifeStage', val='adult'),
 Row(recordset='568e209f-d072-4fd6-8b64-27954b0fd731', key='dwc:month', val='2')]

We can get an idea of the size of the data we are looking at by the number of key value pairs that are returned. This number should be close to the number of colums multiplied by the number of rows...

In [7]:
k_v_pairs.count()

3800724543

If we summarize these pairs by the number of times they occur within each column, we can then use this unique value dataframe for further analysis. 

In [8]:
index = (k_v_pairs
         .groupBy(k_v_pairs.recordset, k_v_pairs.key, k_v_pairs.val)
         .count()
         )
index.head(10)

[Row(recordset='7450a9e3-ef95-4f9e-8260-09b498d2c5e6', key='dwc:country', val='Cuba', count=5574),
 Row(recordset='c4bb08d4-c310-4879-abee-1b3986e8e0ca', key='dwc:day', val='4', count=8879),
 Row(recordset='91c5eec8-0cdc-4be2-9a99-a15ae5ec3edc', key='dwc:basisOfRecord', val='PreservedSpecimen', count=66480),
 Row(recordset='71b8ffab-444e-43f9-9a9c-5c42b0eaa5eb', key='dwc:nomenclaturalCode', val='ICZN', count=78978),
 Row(recordset='78ee1a12-9e8a-4d9c-84de-e2dfce4e1447', key='dwc:day', val='9', count=1749),
 Row(recordset='92e4e092-6dcb-46bc-85a0-dea8310aba45', key='taxonomicstatus', val='doubtful', count=1713),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='dwc:sex', val='male', count=224739),
 Row(recordset='d7b285d4-2643-45ee-9302-b0c3d51dda5c', key='dwc:day', val='8', count=9132),
 Row(recordset='8dc14464-57b3-423e-8cb0-950ab8f36b6f', key='basisofrecord', val='preservedspecimen', count=37465),
 Row(recordset='1c8d18f4-5af2-4d86-98d2-8a5ed06456e2', key='dwc:basisOfRecord

In [11]:
index.count()

1206331

We can now reduce this entire dataset down to the elements we are most interested in, paleontology collections. Using the iDigBio U.S. Collections catalog, we can get a comma separated list of paleo recordset UUIDs:

In [16]:
paleoRecordsets = [
"6c6f34ed-58a4-4ba2-b9c7-34524f79a349",
"137ed4cd-5172-45a5-acdb-8e1de9a64e32",
"5ab348ab-439a-4697-925c-d6abe0c09b92",
"f9a33279-d6ba-41c7-a511-ef6adfcb6e20",
"95ecb448-3c1f-4145-8565-4f6d51beb62c", 
"271a9ce9-c6d3-4b63-a722-cb0adc48863f",
"1ba0bbad-28a7-4c50-8992-a028f79d1dc5",
"62c310ac-e1ff-47bc-860d-0471a84ed0d3",
"271a9ce9-c6d3-4b63-a722-cb0adc48863f",
"b1f0612a-bc21-424f-b9c1-3bba69ad4f54",
"7b0809fb-fd62-4733-8f40-74ceb04cbcac",
"7ae4d15d-62e2-459b-842a-446f921b9d3f",
"1ebb0c8e-31f2-4564-b75d-65196bee4f09",
"71b8ffab-444e-43f9-9a9c-5c42b0eaa5eb",
"d621e959-2633-4ec1-a2a2-5d97cd818b47",
"0220907a-0463-4ae0-8a0b-77f5e80fff40",
"84c24d87-e4ad-4165-8e86-5ae1a249c196",
"b26fa674-6300-4ea0-a8e3-fc0ce32b5226",
"41b119de-f745-482d-be42-a0155bc76e5d",
"0e162e0a-bf3e-4710-9357-44258ca12abb",
"667c2736-bcd3-4a6a-abf4-db5d2dc815c4",
"2ec3b31e-c86b-4ce9-b265-77c8c3f9643c",
"81316846-80cb-4913-8941-b31537761eb0", 
"1c8d18f4-5af2-4d86-98d2-8a5ed06456e2", 
"7c2c5cdc-80e6-49d5-8e95-08fc7da0a370",
"db3181c9-48dd-489f-96ab-a5888f5a938c", 
"4dfb5828-3653-4604-ac00-db1e1da98b02", 
"7757c07f-18fd-45c2-84cc-60bd3742e100",
"b7a79601-c07b-46d5-bd09-d4472b0d9431",
"9e66257f-21a9-491a-ac23-06b7b62ceeb7",
"d11f19ae-e946-4a0e-83e5-2052ae8cca62",
"ba77d411-4179-4dbd-b6c1-39b8a71ae795",
"e27f0218-47e0-41bc-9086-9d9169096e90",
"879d475f-4b76-4d18-8cf6-a7e5a6d44926",
"a2beb85e-f2b8-4366-8b3b-e5c5cc117aaf",
"2c2cc29c-3572-4568-a129-c8cbec34ccbe",
"a2a7754b-2346-496d-b681-eb754ef32b9e",
"0d05a365-36e8-4150-a350-23ed33f79b17",
"5082e6c8-8f5b-4bf6-a930-e3e6de7bf6fb",
"bf049384-ffe2-4418-a1a3-fc5552ba850f"
]

In [17]:
paleo = index[index['recordset'].isin(paleoRecordsets)]
paleo.head(10)

[Row(recordset='71b8ffab-444e-43f9-9a9c-5c42b0eaa5eb', key='dwc:nomenclaturalCode', val='ICZN', count=78978),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='dwc:sex', val='male', count=224739),
 Row(recordset='1c8d18f4-5af2-4d86-98d2-8a5ed06456e2', key='dwc:basisOfRecord', val='FossilSpecimen', count=11877),
 Row(recordset='81316846-80cb-4913-8941-b31537761eb0', key='taxonrank', val='species', count=8752),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='dwc:country', val='Jordan', count=334),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='typestatus', val='holotype of diadema affine', count=1),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='dwc:typeStatus', val='Voucher of Clavilithes scalaris', count=16),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='dwc:typeStatus', val='Syntype of Caranx secundus| Syntype of Caranx secundus', count=2),
 Row(recordset='271a9ce9-c6d3-4b63-a722-cb0adc48863f', key='typestatus', val='syntyp

In [18]:
paleo.count()

186790

In [20]:
(paleo
 .write
 .parquet("/home/kevinlove/paleo/outputs/paleo-values-20182811.parquet")
)

In [24]:
(paleo
 .write
 .format("com.databricks.spark.csv")
 .option("header", "false")
 .save("/home/kevinlove/paleo/outputs/paleo-value-20182811.csv")
)

We can continue to work with these data here or access them via the filesystem by saving a copy locally

In [25]:
from hdfs import InsecureClient
client = InsecureClient('http://mesos01.acis.ufl.edu:50070/', user='kevinlove')
client.download('/home/kevinlove/paleo/outputs/paleo-value-20182811.csv', '/home/kevinlove/paleo/outputs/paleo-values-20182811', n_threads=5)

'/home/kevinlove/paleo/outputs/paleo-values-20182811'