In [14]:
import boto3, time, json
import pandas as pd

In [2]:
athena = boto3.client('athena', region_name='us-east-1')

In [3]:
def get_query_status(queryId):
    res = athena.get_query_execution(QueryExecutionId=queryId)
    return res.get('QueryExecution').get('Status').get('State')

def run_and_wait_for_query(query_string='select count(distinct(uid)) from youthmappers'):
    query = athena.start_query_execution(
        QueryString=query_string,
        QueryExecutionContext={
            'Database': 'default'
        },
        ResultConfiguration={
            'OutputLocation': 's3://yetigeolabs/youthmappers/query_results/',
        },
        WorkGroup='youthmappers'
    )
    queryId = query.get('QueryExecutionId')
    
    count=1
    while get_query_status(queryId) in ["QUEUED","RUNNING"]:
        print( get_query_status(queryId), end=f" .. ({5*count})\n" )
        time.sleep(5*count)
        count+=1
    print("Status: " + get_query_status(queryId))
    return queryId

In [4]:
count = run_and_wait_for_query()
res = athena.get_query_results(
    QueryExecutionId=count,
    MaxResults=2
)
youthmapper_count = int([ d.get('Data')[0].get('VarCharValue') for d in res.get('ResultSet').get('Rows')][1])
print(f"Athena has {youthmapper_count} YouthMappers UIDs")

QUEUED .. (5)
Status: SUCCEEDED
Athena has 2815 YouthMappers UIDs


In [6]:
# Check for the latest list of Mappers?

## Upload the latest list of Mappers to S3

In [5]:
# Build the latest list of mappers based on the files

In [20]:
mappers = pd.read_csv('../osm-teams-api/youthmappers.csv').set_index('uid')
chapters = pd.read_json('../osm-teams-api/chapters.json')
chapters['chapter_lon'] = chapters.location.apply(lambda l: json.loads(l).get('coordinates')[0] if pd.notnull(l) else None)
chapters['chapter_lat'] = chapters.location.apply(lambda l: json.loads(l).get('coordinates')[1] if pd.notnull(l) else None)
# Merge them:
t = mappers.merge(chapters[
    ['name','University','City','Country','chapter_lon','chapter_lat']
                 ].rename(columns={'name':'Chapter'}), left_on='team_id', right_index=True
).reset_index().rename(columns={'index':'uid'})
t.head()

Unnamed: 0,uid,username,Gender,team_id,Alumni,Steering Committee,Regional Ambassador,Mentor,Chapter,University,City,Country,chapter_lon,chapter_lat
0,27099,MAPconcierge,,57,,,,,YouthMappers AGU,Aoyama Gakuin University,Sagamihara,Japan,139.710309,35.660551
1,8353419,f1kuni,male,57,,,,,YouthMappers AGU,Aoyama Gakuin University,Sagamihara,Japan,139.710309,35.660551
2,10419753,OkayaT,female,57,,,,,YouthMappers AGU,Aoyama Gakuin University,Sagamihara,Japan,139.710309,35.660551
3,11830680,caucasus1327,male,57,,,,,YouthMappers AGU,Aoyama Gakuin University,Sagamihara,Japan,139.710309,35.660551
4,13067323,ibukilego,male,57,,,,,YouthMappers AGU,Aoyama Gakuin University,Sagamihara,Japan,139.710309,35.660551


In [21]:
t.to_csv('youthmappers_for_athena.tsv', sep="\t", header=False, index=False)

In [22]:
s3 = boto3.resource('s3')
data = open('youthmappers_for_athena.tsv','rb')
s3.Bucket('yetigeolabs').put_object(Key='youthmappers/mappers/youthmappers-for-athena.tsv', Body=data)

s3.Object(bucket_name='yetigeolabs', key='youthmappers/mappers/youthmappers-for-athena.tsv')

In [23]:
count = run_and_wait_for_query()
res = athena.get_query_results(
    QueryExecutionId=count,
    MaxResults=2
)
youthmapper_count = int([ d.get('Data')[0].get('VarCharValue') for d in res.get('ResultSet').get('Rows')][1])
print(f"Athena now sees {youthmapper_count} YouthMappers UIDs")

QUEUED .. (5)
Status: SUCCEEDED
Athena now sees 2896 YouthMappers UIDs


# Obtaining all YouthMappers Changesets

In [24]:
# Okay, actually run the changesets query:
youthmapper_changesets = """
WITH ym_changesets AS (
	SELECT changesets.id as changeset_id,
		changesets.uid,
		split(tags [ 'hashtags' ], ';') as hashtags,
		split(tags [ 'created_by' ], ' ') [ 1 ] as created_by,
		date(changesets.created_at) as _day,
		num_changes,
		ST_Point(
			(min_lon + max_lon) / 2,
			(min_lat + max_lat) / 2
		) as center,
		bing_tile_quadkey(
			bing_tile_at(
				(min_lat + max_lat) / 2,
				(min_lon + max_lon) / 2,
				15
			)
		) as quadkey,
		min_lat,
		min_lon,
		max_lat,
		max_lon,
		CASE
			WHEN min_lat <> max_lat
			AND min_lon <> max_lon THEN ST_AREA(
				to_spherical_geography(
					ST_ENVELOPE(
						ST_LINESTRING(
							ARRAY [ ST_Point(min_lon, min_lat),
							ST_Point(max_lon, max_lat) ]
						)
					)
				)
			) ELSE 0
		END AS area
	FROM changesets
		INNER JOIN youthmappers ON youthmappers.uid = changesets.uid
	WHERE changesets.created_at > date '2015-01-01'
),
aggregated_features_per_changeset AS (
	SELECT changeset_id,
		count_if(
			(tags [ 'highway' ] is not null)
			and (version = 1)
		) as new_highways,
		count_if(
			(tags [ 'building' ] is not null)
			and (version = 1)
		) as new_buildings,
		count_if(
			(tags [ 'amenity' ] is not null)
			and (version = 1)
		) as new_amenities,
		count_if(
			(tags [ 'highway' ] is not null)
			and (version > 1)
		) as edited_highways,
		count_if(
			(tags [ 'building' ] is not null)
			and (version > 1)
		) as edited_buildings,
		count_if(
			(tags [ 'amenity' ] is not null)
			and (version > 1)
		) as edited_amenities,
		count_if(
			type = 'node'
			and version > 1
			and cardinality(tags) = 0
			and visible = true
		) as edited_vertices,
		count_if(visible = false) as deleted_elements,
		count_if(
			visible = false
			and type = 'node'
		) as deleted_nodes,
		count_if(version > 1) as edited_elements,
		count_if(
			version > 1
			and cardinality(tags) > 0
		) as edited_features,
		count_if(version = 1) as new_features
	FROM ym_changesets
		JOIN planet_history ON ym_changesets.changeset_id = planet_history.changeset
	WHERE cardinality(planet_history.tags) > 0
		OR type IN ('way', 'relation')
		OR (
			type = 'node'
			AND version > 1
		)
	GROUP BY changeset_id
),

per_tile_user_day AS (
	SELECT sum(new_highways) as new_highways,
		sum(new_buildings) as new_buildings,
		sum(new_amenities) as new_amenities,
		sum(new_features) as new_features,
		sum(edited_highways) as edited_highways,
		sum(edited_buildings) as edited_buildings,
		sum(edited_amenities) as edited_amenities,
		sum(edited_features) as edited_features,
		sum(edited_vertices) as edited_vertices,
		sum(edited_elements) as edited_elements,
		sum(deleted_elements) as deleted_elements,
		sum(deleted_nodes) as deleted_nodes,
		sum(num_changes) as sum_edits,
		count(DISTINCT(a.changeset_id)) as changeset_count,
		arbitrary(a.changeset_id) as arbitrary_changeset,
		slice(array_agg(a.changeset_id), 1, 5) as sample_changeset_ids,
		flatten(array_agg(hashtags)) as hashtags_lists,
		histogram(created_by) as tools,
		geometry_union_agg(center) as point_agg,
		min(min_lat) as min_lat,
		min(min_lon) as min_lon,
		max(max_lat) as max_lat,
		max(max_lon) as max_lon,
		sum(area) as total_area,
		a.uid,
		a.quadkey,
		a._day
	FROM ym_changesets a
		JOIN aggregated_features_per_changeset b ON a.changeset_id = b.changeset_id
	GROUP BY uid,
		quadkey,
		_day
),
per_tile_user_day_with_ym AS (
	SELECT a._day,
		a.quadkey,
		ST_Centroid(point_agg) as centroid,
		ST_CONVEXHULL(point_agg) as convex_hull,
		new_highways,
		new_buildings,
		new_amenities,
		edited_highways,
		edited_buildings,
		edited_amenities,
		new_features,
		edited_features,
		edited_vertices,
		edited_elements,
		deleted_elements,
		deleted_nodes,
		sum_edits,
		arbitrary_changeset,
		sample_changeset_ids,
		changeset_count,
		min_lon,
		max_lon,
		min_lat,
		max_lat,
		total_area,
		cast(a.tools as json) as tools,
		cast(a.hashtags_lists as json) as hashtags,
		b.*
	FROM per_tile_user_day a
		JOIN youthmappers b ON a.uid = b.uid
)
SELECT *,
	CASE
		WHEN chapter_lon IS NOT NULL
		AND chapter_lat IS NOT NULL THEN ST_DISTANCE(
			to_spherical_geography(ST_Point(chapter_lon, chapter_lat)),
			to_spherical_geography(centroid)
		) / 1000 ELSE NULL
	END AS km_to_university
FROM per_tile_user_day_with_ym
ORDER BY _day DESC,
	uid DESC
"""

In [25]:
ym_changesets = run_and_wait_for_query(query_string = youthmapper_changesets)

QUEUED .. (5)
RUNNING .. (10)
RUNNING .. (15)
RUNNING .. (20)
RUNNING .. (25)
RUNNING .. (30)
RUNNING .. (35)
RUNNING .. (40)
Status: SUCCEEDED


In [26]:
s3 = boto3.client('s3')
with open('youthmapper_changesets_latest.csv', 'wb') as f:
    s3.download_fileobj('yetigeolabs', f'youthmappers/query_results/{ym_changesets}.csv', f)