Skip to content

Commit

Permalink
Add support for Databricks (Close #9)
Browse files Browse the repository at this point in the history
  • Loading branch information
agnessnowplow committed Aug 4, 2022
1 parent 4edd780 commit 4f20453
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 15 deletions.
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -12,10 +12,10 @@ Please refer to the [doc site][snowplow-media-player-docs] for a full breakdown

The snowplow-media-player v0.2.0 package currently supports the following adapters:

| Warehouse | dbt versions | snowplow-web version | snowplow-media-player version |
|:--------------------------------------------:|:-------------------:|:--------------------:|:-----------------------------:|
| Redshift, Postgres, BigQuery & Snowflake | >=1.0.0 to <2.0.0 | >=0.7.0 to <0.8.0 | 0.2.0 |
| Redshift & Postgres | >=0.20.0 to <1.1.0 | >=0.6.0 to <0.7.0 | 0.1.0 |
| Warehouse | dbt versions | snowplow-web version | snowplow-media-player version |
|:----------------------------------------------------:|:-------------------:|:--------------------:|:-----------------------------:|
| BigQuery, Databricks, Postgres, Redshift & Snowflake | >=1.0.0 to <2.0.0 | >=0.8.0 to <0.9.0 | 0.3.0 |
| Redshift & Postgres | >=0.20.0 to <1.1.0 | >=0.6.0 to <0.7.0 | 0.1.0 |


### Requirements
Expand Down
7 changes: 7 additions & 0 deletions dbt_project.yml
Expand Up @@ -30,6 +30,11 @@ vars:
snowplow__enable_whatwg_media: false
# set to true if the HTML5 video element context schema is enabled
snowplow__enable_whatwg_video: false
snowplow__media_player_event_context: "{{ source('atomic', 'com_snowplowanalytics_snowplow_media_player_event_1') }}"
snowplow__media_player_context: "{{ source('atomic', 'com_snowplowanalytics_snowplow_media_player_1') }}"
snowplow__youtube_context: "{{ source('atomic', 'com_youtube_youtube_1') }}"
snowplow__html5_media_element_context: "{{ source('atomic', 'org_whatwg_media_element_1') }}"
snowplow__html5_video_element_context: "{{ source('atomic', 'org_whatwg_video_element_1') }}"

models:
snowplow_media_player:
Expand All @@ -45,6 +50,8 @@ models:
interactions_this_run:
bigquery:
enabled: "{{ target.type == 'bigquery' | as_bool() }}"
databricks:
enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}"
redshift_postgres:
enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}"
snowflake:
Expand Down
2 changes: 1 addition & 1 deletion docs/markdown/snowplow_media_player_overview.md
Expand Up @@ -34,7 +34,7 @@ The additional `_pivot_base` table is there to calculate the percent_progress bo

## Adapter Support

The Snowplow Media Player v0.2.0 package currently supports Redshift, Postgres, Snowflake and BigQuery.
The Snowplow Media Player v0.2.0 package currently supports BigQuery, Databricks, Postgres, Redshift and Snowflake.

## Requirements

Expand Down
6 changes: 5 additions & 1 deletion models/custom/snowplow_media_player_session_stats.sql
Expand Up @@ -6,7 +6,7 @@
partition_by = snowplow_utils.get_partition_by(bigquery_partition_by={
"field": "start_tstamp",
"data_type": "timestamp"
}),
}, databricks_partition_by='start_tstamp_date'),
cluster_by=snowplow_utils.get_cluster_by(bigquery_cols=["domain_userid"]),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt'))
)
Expand Down Expand Up @@ -41,4 +41,8 @@ with prep as (

select *

{% if target.type in ['databricks', 'spark'] -%}
, date(start_tstamp) as start_tstamp_date
{%- endif %}

from prep
6 changes: 5 additions & 1 deletion models/custom/snowplow_media_player_user_stats.sql
Expand Up @@ -6,7 +6,7 @@
partition_by = snowplow_utils.get_partition_by(bigquery_partition_by={
"field": "first_play",
"data_type": "timestamp"
}),
}, databricks_partition_by='first_play_date'),
cluster_by=snowplow_utils.get_cluster_by(bigquery_cols=["domain_userid"]),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt'))
)
Expand Down Expand Up @@ -37,4 +37,8 @@ with prep as (

select *

{% if target.type in ['databricks', 'spark'] -%}
, date(first_play) as first_play_date
{%- endif %}

from prep
@@ -0,0 +1,66 @@
{{
config(
materialized='table',
tags=["this_run"],
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt'))
)
}}

with prep as (

select
e.event_id,
e.page_view_id,
e.domain_sessionid,
e.domain_userid,
e.page_referrer,
e.page_url,
e.unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1.label::STRING as media_label,
round(contexts_com_snowplowanalytics_snowplow_media_player_1[0].duration::float) as duration,
e.geo_region_name,
e.br_name,
e.dvce_type,
e.os_name,
e.os_timezone,
e.unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1.type::STRING as event_type,
e.derived_tstamp as start_tstamp,
contexts_com_snowplowanalytics_snowplow_media_player_1[0].current_time::float as player_current_time,
coalesce(contexts_com_snowplowanalytics_snowplow_media_player_1[0].playback_rate::STRING, 1) as playback_rate,
case when e.unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1.type::STRING = 'ended' then 100 else contexts_com_snowplowanalytics_snowplow_media_player_1[0].percent_progress::int end percent_progress,
contexts_com_snowplowanalytics_snowplow_media_player_1[0].muted::STRING as is_muted,
contexts_com_snowplowanalytics_snowplow_media_player_1[0].is_live::STRING as is_live,
contexts_com_snowplowanalytics_snowplow_media_player_1[0].loop::STRING as loop,
contexts_com_snowplowanalytics_snowplow_media_player_1[0].volume::STRING as volume,
{% if var("snowplow__enable_whatwg_media") is false and var("snowplow__enable_whatwg_video") %}
{{ exceptions.raise_compiler_error("variable: snowplow__enable_whatwg_video is enabled but variable: snowplow__enable_whatwg_media is not, both need to be enabled for modelling html5 video tracking data.") }}
{% elif var("snowplow__enable_youtube") %}
{% if var("snowplow__enable_whatwg_media") %}
coalesce(e.contexts_com_youtube_youtube_1[0].player_id::STRING, e.contexts_org_whatwg_media_element_1[0].html_id::STRING) as media_id,
case when e.contexts_com_youtube_youtube_1[0].player_id is not null then 'com.youtube-youtube'
when e.contexts_org_whatwg_media_element_1[0].html_id::STRING is not null then 'org.whatwg-media_element' else 'unknown' end as media_player_type,
coalesce(e.contexts_com_youtube_youtube_1[0].url::STRING, e.contexts_org_whatwg_media_element_1[0].current_src::STRING) as source_url,
case when e.contexts_org_whatwg_media_element_1[0].media_type::STRING = 'audio' then 'audio' else 'video' end as media_type,
{% if var("snowplow__enable_whatwg_video") %}
e.contexts_org_whatwg_video_element_1[0].video_width::STRING||'x'||e.contexts_org_whatwg_video_element_1[0].video_height::STRING as playback_quality
{% else %}
'N/A' as playback_quality
{% endif %}
{% else %}
{{ exceptions.raise_compiler_error("No media context enabled. Please enable as many of the following variables as required: snowplow__enable_youtube, snowplow__enable_whatwg_media, snowplow__enable_whatwg_video") }}
{% endif %}

from {{ ref("snowplow_web_base_events_this_run") }} as e

where event_name = 'media_player_event'
)

select
{{ dbt_utils.surrogate_key(['p.page_view_id', 'p.media_id' ]) }} play_id,
p.*,
coalesce(cast(piv.weight_rate * p.duration / 100 as {{ dbt_utils.type_int() }}), 0) as play_time_sec,
coalesce(cast(case when p.is_muted = true then piv.weight_rate * p.duration / 100 else 0 end as {{ dbt_utils.type_int() }}), 0) as play_time_sec_muted

from prep p

left join {{ ref("snowplow_media_player_pivot_base") }} piv
on p.percent_progress = piv.percent_progress
Expand Up @@ -38,7 +38,7 @@ with prep as (
coalesce(e.contexts_com_youtube_youtube_1[0]:playerId::varchar, e.contexts_org_whatwg_media_element_1[0]:htmlId::varchar) as media_id,
case when e.contexts_com_youtube_youtube_1[0]:playerId is not null then 'com.youtube-youtube'
when e.contexts_org_whatwg_media_element_1[0]:htmlId::varchar is not null then 'org.whatwg-media_element' else 'unknown' end as media_player_type,
coalesce(e.contexts_com_youtube_youtube_1[0]:url::varchar, e.contexts_org_whatwg_media_element_1[0]:currentSource::varchar) as source_url,
coalesce(e.contexts_com_youtube_youtube_1[0]:url::varchar, e.contexts_org_whatwg_media_element_1[0]:currentSrc::varchar) as source_url,
case when e.contexts_org_whatwg_media_element_1[0]:mediaType::varchar = 'audio' then 'audio' else 'video' end as media_type,
{% if var("snowplow__enable_whatwg_video") %}
coalesce(e.contexts_com_youtube_youtube_1[0]:playbackQuality::varchar, e.contexts_org_whatwg_video_element_1[0]:videoWidth::varchar||'x'||e.contexts_org_whatwg_video_element_1[0]:videoHeight::varchar) as playback_quality
Expand Down
9 changes: 8 additions & 1 deletion models/web/scratch/snowplow_media_player_base_this_run.sql
Expand Up @@ -5,7 +5,7 @@
partition_by = snowplow_utils.get_partition_by(bigquery_partition_by={
"field": "start_tstamp",
"data_type": "timestamp"
}),
}, databricks_partition_by='start_tstamp_date'),
cluster_by=snowplow_utils.get_cluster_by(bigquery_cols=["media_id"]),
sort = 'start_tstamp',
dist = 'play_id',
Expand Down Expand Up @@ -50,6 +50,9 @@ with prep as (
{%- elif target.type == 'bigquery' %}
string_agg(cast(i.percent_progress as string), ',' order by i.percent_progress) as percent_progress_reached,

{%- elif target.type == 'databricks' %}
array_join(array_sort(collect_set(cast(i.percent_progress as string))),",") as percent_progress_reached,

{%- else -%}
{{ exceptions.raise_compiler_error("Target is not supported. Got: " ~ target.type) }}

Expand Down Expand Up @@ -137,6 +140,10 @@ select
d.seeks,
d.percent_progress_reached

{% if target.type in ['databricks', 'spark'] -%}
, date(start_tstamp) as start_tstamp_date
{%- endif %}

from dedupe d

left join retention_rate r
Expand Down
6 changes: 5 additions & 1 deletion models/web/snowplow_media_player_base.sql
Expand Up @@ -9,14 +9,18 @@
partition_by = snowplow_utils.get_partition_by(bigquery_partition_by={
"field": "start_tstamp",
"data_type": "timestamp"
}),
}, databricks_partition_by='start_tstamp_date'),
cluster_by=snowplow_utils.get_cluster_by(bigquery_cols=["media_id"]),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt'))
)
}}

select *

{% if target.type in ['databricks', 'spark'] -%}
, date(start_tstamp) as start_tstamp_date
{%- endif %}

from {{ ref('snowplow_media_player_base_this_run') }}

where {{ snowplow_utils.is_run_with_new_events('snowplow_web') }} --returns false if run doesn't contain new events.
14 changes: 9 additions & 5 deletions models/web/snowplow_media_player_media_stats.sql
Expand Up @@ -8,7 +8,7 @@
partition_by = snowplow_utils.get_partition_by(bigquery_partition_by={
"field": "first_play",
"data_type": "timestamp"
}),
}, databricks_partition_by='first_play_date'),
cluster_by=snowplow_utils.get_cluster_by(bigquery_cols=["media_id"]),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt'))
)
Expand Down Expand Up @@ -87,7 +87,7 @@ group by 1,2,4,5
{%- elif target.type == 'postgres' %}
string_to_array(p.percent_progress_reached, ',') as percent_progress_reached

{%- elif target.type in ['snowflake', 'bigquery'] %}
{%- elif target.type in ['snowflake', 'bigquery', 'databricks'] %}
split(p.percent_progress_reached, ',') as percent_progress_reached

{%- else -%}
Expand Down Expand Up @@ -283,12 +283,12 @@ from percent_progress_reached t, table(flatten(t.percent_progress_reached)) r

)

{%- elif target.type == 'bigquery' %}
{%- elif target.type == 'databricks' %}
, unnesting as (

select media_id, r as value_reached
select media_id, LATERAL VIEW explode(percent_progress_reached) as value_reached

from percent_progress_reached t, unnest(t.percent_progress_reached) r
from percent_progress_reached

)

Expand Down Expand Up @@ -321,6 +321,10 @@ select
p.avg_retention_rate,
l.last_base_tstamp,

{% if target.type in ['databricks', 'spark'] -%}
date(first_play) as first_play_date,
{%- endif %}

{% if is_incremental() %}

{% for element in get_percentage_boundaries(var("snowplow__percent_progress_boundaries")) %}
Expand Down

0 comments on commit 4f20453

Please sign in to comment.